This is an automated email from the ASF dual-hosted git repository.

lhotari pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 587af853fbf [feat][misc] PIP-264: Add OpenTelemetry authentication and 
token metrics (#23016)
587af853fbf is described below

commit 587af853fbf976d5007e17dba910a4a14e3e85e8
Author: Dragos Misca <[email protected]>
AuthorDate: Wed Aug 28 20:50:52 2024 -0700

    [feat][misc] PIP-264: Add OpenTelemetry authentication and token metrics 
(#23016)
---
 .../AuthenticationProviderAthenz.java              |  17 ++-
 .../AuthenticationProviderAthenzTest.java          |  14 +-
 .../oidc/AuthenticationProviderOpenID.java         |  27 ++--
 .../broker/authentication/oidc/JwksCache.java      |  26 ++--
 .../oidc/OpenIDProviderMetadataCache.java          |  24 +--
 ...uthenticationProviderOpenIDIntegrationTest.java |  20 +--
 .../oidc/AuthenticationProviderOpenIDTest.java     | 111 +++++++-------
 .../authentication/AuthenticationProviderSasl.java |   6 +
 .../authentication/SaslAuthenticateTest.java       |   4 +-
 pulsar-broker-common/pom.xml                       |   5 +
 .../authentication/AuthenticationProvider.java     |  25 ++++
 .../AuthenticationProviderBasic.java               |  17 ++-
 .../authentication/AuthenticationProviderList.java |  44 ++++--
 .../authentication/AuthenticationProviderTls.java  |  17 ++-
 .../AuthenticationProviderToken.java               |  47 +++---
 .../authentication/AuthenticationService.java      |  12 +-
 .../metrics/AuthenticationMetrics.java             |  65 ++++++++-
 .../metrics/AuthenticationMetricsToken.java        | 109 ++++++++++++++
 .../AuthenticationProviderBasicTest.java           |   8 +-
 .../AuthenticationProviderListTest.java            |   5 +-
 .../AuthenticationProviderTokenTest.java           |  73 ++++++----
 .../pulsar/broker/service/BrokerService.java       |   3 +-
 .../broker/stats/PulsarBrokerOpenTelemetry.java    |   5 +
 .../broker/stats/MetadataStoreStatsTest.java       |   4 +-
 .../OpenTelemetryAuthenticationStatsTest.java      | 161 +++++++++++++++++++++
 .../pulsar/broker/stats/PrometheusMetricsTest.java |  42 +++---
 26 files changed, 674 insertions(+), 217 deletions(-)

diff --git 
a/pulsar-broker-auth-athenz/src/main/java/org/apache/pulsar/broker/authentication/AuthenticationProviderAthenz.java
 
b/pulsar-broker-auth-athenz/src/main/java/org/apache/pulsar/broker/authentication/AuthenticationProviderAthenz.java
index 652a922b9a5..499ebefc8a0 100644
--- 
a/pulsar-broker-auth-athenz/src/main/java/org/apache/pulsar/broker/authentication/AuthenticationProviderAthenz.java
+++ 
b/pulsar-broker-auth-athenz/src/main/java/org/apache/pulsar/broker/authentication/AuthenticationProviderAthenz.java
@@ -43,6 +43,8 @@ public class AuthenticationProviderAthenz implements 
AuthenticationProvider {
     private List<String> domainNameList = null;
     private int allowedOffset = 30;
 
+    private AuthenticationMetrics authenticationMetrics;
+
     public enum ErrorCode {
         UNKNOWN,
         NO_CLIENT,
@@ -54,6 +56,14 @@ public class AuthenticationProviderAthenz implements 
AuthenticationProvider {
 
     @Override
     public void initialize(ServiceConfiguration config) throws IOException {
+        initialize(Context.builder().config(config).build());
+    }
+
+    @Override
+    public void initialize(Context context) throws IOException {
+        authenticationMetrics = new 
AuthenticationMetrics(context.getOpenTelemetry(),
+                getClass().getSimpleName(), getAuthMethodName());
+        var config = context.getConfig();
         String domainNames;
         if (config.getProperty(DOMAIN_NAME_LIST) != null) {
             domainNames = (String) config.getProperty(DOMAIN_NAME_LIST);
@@ -86,6 +96,11 @@ public class AuthenticationProviderAthenz implements 
AuthenticationProvider {
         return "athenz";
     }
 
+    @Override
+    public void incrementFailureMetric(Enum<?> errorCode) {
+        authenticationMetrics.recordFailure(errorCode);
+    }
+
     @Override
     public String authenticate(AuthenticationDataSource authData) throws 
AuthenticationException {
         SocketAddress clientAddress;
@@ -141,7 +156,7 @@ public class AuthenticationProviderAthenz implements 
AuthenticationProvider {
 
                 if (token.validate(ztsPublicKey, allowedOffset, false, null)) {
                     log.debug("Athenz Role Token : {}, Authenticated for 
Client: {}", roleToken, clientAddress);
-                    
AuthenticationMetrics.authenticateSuccess(getClass().getSimpleName(), 
getAuthMethodName());
+                    authenticationMetrics.recordSuccess();
                     return token.getPrincipal();
                 } else {
                     errorCode = ErrorCode.INVALID_TOKEN;
diff --git 
a/pulsar-broker-auth-athenz/src/test/java/org/apache/pulsar/broker/authentication/AuthenticationProviderAthenzTest.java
 
b/pulsar-broker-auth-athenz/src/test/java/org/apache/pulsar/broker/authentication/AuthenticationProviderAthenzTest.java
index a5211c2f814..63dcd093978 100644
--- 
a/pulsar-broker-auth-athenz/src/test/java/org/apache/pulsar/broker/authentication/AuthenticationProviderAthenzTest.java
+++ 
b/pulsar-broker-auth-athenz/src/test/java/org/apache/pulsar/broker/authentication/AuthenticationProviderAthenzTest.java
@@ -20,10 +20,8 @@ package org.apache.pulsar.broker.authentication;
 
 import static org.testng.Assert.assertEquals;
 import static org.testng.Assert.fail;
-
 import com.yahoo.athenz.auth.token.RoleToken;
 import com.yahoo.athenz.zpe.ZpeConsts;
-
 import java.io.IOException;
 import java.net.InetSocketAddress;
 import java.nio.file.Files;
@@ -31,9 +29,7 @@ import java.nio.file.Paths;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.Properties;
-
 import javax.naming.AuthenticationException;
-
 import org.apache.pulsar.broker.ServiceConfiguration;
 import org.testng.annotations.BeforeClass;
 import org.testng.annotations.Test;
@@ -55,7 +51,7 @@ public class AuthenticationProviderAthenzTest {
 
         // Initialize authentication provider
         provider = new AuthenticationProviderAthenz();
-        provider.initialize(config);
+        
provider.initialize(AuthenticationProvider.Context.builder().config(config).build());
 
         // Specify Athenz configuration file for AuthZpeClient which is used 
in AuthenticationProviderAthenz
         System.setProperty(ZpeConsts.ZPE_PROP_ATHENZ_CONF, 
"./src/test/resources/athenz.conf.test");
@@ -69,7 +65,7 @@ public class AuthenticationProviderAthenzTest {
         emptyConf.setProperties(emptyProp);
         AuthenticationProviderAthenz sysPropProvider1 = new 
AuthenticationProviderAthenz();
         try {
-            sysPropProvider1.initialize(emptyConf);
+            
sysPropProvider1.initialize(AuthenticationProvider.Context.builder().config(emptyConf).build());
             assertEquals(sysPropProvider1.getAllowedOffset(), 30); // default 
allowed offset is 30 sec
         } catch (Exception e) {
             fail("Fail to Read pulsar.athenz.domain.names from System 
Properties");
@@ -78,7 +74,7 @@ public class AuthenticationProviderAthenzTest {
         System.setProperty("pulsar.athenz.role.token_allowed_offset", "0");
         AuthenticationProviderAthenz sysPropProvider2 = new 
AuthenticationProviderAthenz();
         try {
-            sysPropProvider2.initialize(config);
+            
sysPropProvider2.initialize(AuthenticationProvider.Context.builder().config(config).build());
             assertEquals(sysPropProvider2.getAllowedOffset(), 0);
         } catch (Exception e) {
             fail("Failed to get allowed offset from system property");
@@ -87,7 +83,7 @@ public class AuthenticationProviderAthenzTest {
         System.setProperty("pulsar.athenz.role.token_allowed_offset", 
"invalid");
         AuthenticationProviderAthenz sysPropProvider3 = new 
AuthenticationProviderAthenz();
         try {
-            sysPropProvider3.initialize(config);
+            
sysPropProvider3.initialize(AuthenticationProvider.Context.builder().config(config).build());
             fail("Invalid allowed offset should not be specified");
         } catch (IOException e) {
         }
@@ -95,7 +91,7 @@ public class AuthenticationProviderAthenzTest {
         System.setProperty("pulsar.athenz.role.token_allowed_offset", "-1");
         AuthenticationProviderAthenz sysPropProvider4 = new 
AuthenticationProviderAthenz();
         try {
-            sysPropProvider4.initialize(config);
+            
sysPropProvider4.initialize(AuthenticationProvider.Context.builder().config(config).build());
             fail("Negative allowed offset should not be specified");
         } catch (IOException e) {
         }
diff --git 
a/pulsar-broker-auth-oidc/src/main/java/org/apache/pulsar/broker/authentication/oidc/AuthenticationProviderOpenID.java
 
b/pulsar-broker-auth-oidc/src/main/java/org/apache/pulsar/broker/authentication/oidc/AuthenticationProviderOpenID.java
index a9d812c10b0..38f61809133 100644
--- 
a/pulsar-broker-auth-oidc/src/main/java/org/apache/pulsar/broker/authentication/oidc/AuthenticationProviderOpenID.java
+++ 
b/pulsar-broker-auth-oidc/src/main/java/org/apache/pulsar/broker/authentication/oidc/AuthenticationProviderOpenID.java
@@ -88,8 +88,6 @@ import org.slf4j.LoggerFactory;
 public class AuthenticationProviderOpenID implements AuthenticationProvider {
     private static final Logger log = 
LoggerFactory.getLogger(AuthenticationProviderOpenID.class);
 
-    private static final String SIMPLE_NAME = 
AuthenticationProviderOpenID.class.getSimpleName();
-
     // Must match the value used by the OAuth2 Client Plugin.
     private static final String AUTH_METHOD_NAME = "token";
 
@@ -148,8 +146,18 @@ public class AuthenticationProviderOpenID implements 
AuthenticationProvider {
     private String[] allowedAudiences;
     private ApiClient k8sApiClient;
 
+    private AuthenticationMetrics authenticationMetrics;
+
     @Override
     public void initialize(ServiceConfiguration config) throws IOException {
+        initialize(Context.builder().config(config).build());
+    }
+
+    @Override
+    public void initialize(Context context) throws IOException {
+        authenticationMetrics = new 
AuthenticationMetrics(context.getOpenTelemetry(),
+                getClass().getSimpleName(), getAuthMethodName());
+        var config = context.getConfig();
         this.allowedAudiences = 
validateAllowedAudiences(getConfigValueAsSet(config, ALLOWED_AUDIENCES));
         this.roleClaim = getConfigValueAsString(config, ROLE_CLAIM, 
ROLE_CLAIM_DEFAULT);
         this.isRoleClaimNotSubject = !ROLE_CLAIM_DEFAULT.equals(roleClaim);
@@ -181,8 +189,8 @@ public class AuthenticationProviderOpenID implements 
AuthenticationProvider {
                 .build();
         httpClient = new DefaultAsyncHttpClient(clientConfig);
         k8sApiClient = fallbackDiscoveryMode != FallbackDiscoveryMode.DISABLED 
? Config.defaultClient() : null;
-        this.openIDProviderMetadataCache = new 
OpenIDProviderMetadataCache(config, httpClient, k8sApiClient);
-        this.jwksCache = new JwksCache(config, httpClient, k8sApiClient);
+        this.openIDProviderMetadataCache = new 
OpenIDProviderMetadataCache(this, config, httpClient, k8sApiClient);
+        this.jwksCache = new JwksCache(this, config, httpClient, k8sApiClient);
     }
 
     @Override
@@ -190,6 +198,11 @@ public class AuthenticationProviderOpenID implements 
AuthenticationProvider {
         return AUTH_METHOD_NAME;
     }
 
+    @Override
+    public void incrementFailureMetric(Enum<?> errorCode) {
+        authenticationMetrics.recordFailure(errorCode);
+    }
+
     /**
      * Authenticate the parameterized {@link AuthenticationDataSource} by 
verifying the issuer is an allowed issuer,
      * then retrieving the JWKS URI from the issuer, then retrieving the 
Public key from the JWKS URI, and finally
@@ -219,7 +232,7 @@ public class AuthenticationProviderOpenID implements 
AuthenticationProvider {
         return authenticateToken(token)
                 .whenComplete((jwt, e) -> {
                     if (jwt != null) {
-                        
AuthenticationMetrics.authenticateSuccess(getClass().getSimpleName(), 
getAuthMethodName());
+                        authenticationMetrics.recordSuccess();
                     }
                     // Failure metrics are incremented within methods above
                 });
@@ -463,10 +476,6 @@ public class AuthenticationProviderOpenID implements 
AuthenticationProvider {
         }
     }
 
-    static void incrementFailureMetric(AuthenticationExceptionCode code) {
-        AuthenticationMetrics.authenticateFailure(SIMPLE_NAME, 
AUTH_METHOD_NAME, code);
-    }
-
     /**
      * Validate the configured allow list of allowedIssuers. The 
allowedIssuers set must be nonempty in order for
      * the plugin to authenticate any token. Thus, it fails initialization if 
the configuration is
diff --git 
a/pulsar-broker-auth-oidc/src/main/java/org/apache/pulsar/broker/authentication/oidc/JwksCache.java
 
b/pulsar-broker-auth-oidc/src/main/java/org/apache/pulsar/broker/authentication/oidc/JwksCache.java
index 73934e9c1e0..c88661c39c6 100644
--- 
a/pulsar-broker-auth-oidc/src/main/java/org/apache/pulsar/broker/authentication/oidc/JwksCache.java
+++ 
b/pulsar-broker-auth-oidc/src/main/java/org/apache/pulsar/broker/authentication/oidc/JwksCache.java
@@ -18,6 +18,7 @@
  */
 package org.apache.pulsar.broker.authentication.oidc;
 
+import static 
org.apache.pulsar.broker.authentication.oidc.AuthenticationExceptionCode.ERROR_RETRIEVING_PUBLIC_KEY;
 import static 
org.apache.pulsar.broker.authentication.oidc.AuthenticationProviderOpenID.CACHE_EXPIRATION_SECONDS;
 import static 
org.apache.pulsar.broker.authentication.oidc.AuthenticationProviderOpenID.CACHE_EXPIRATION_SECONDS_DEFAULT;
 import static 
org.apache.pulsar.broker.authentication.oidc.AuthenticationProviderOpenID.CACHE_REFRESH_AFTER_WRITE_SECONDS;
@@ -26,7 +27,6 @@ import static 
org.apache.pulsar.broker.authentication.oidc.AuthenticationProvide
 import static 
org.apache.pulsar.broker.authentication.oidc.AuthenticationProviderOpenID.CACHE_SIZE_DEFAULT;
 import static 
org.apache.pulsar.broker.authentication.oidc.AuthenticationProviderOpenID.KEY_ID_CACHE_MISS_REFRESH_SECONDS;
 import static 
org.apache.pulsar.broker.authentication.oidc.AuthenticationProviderOpenID.KEY_ID_CACHE_MISS_REFRESH_SECONDS_DEFAULT;
-import static 
org.apache.pulsar.broker.authentication.oidc.AuthenticationProviderOpenID.incrementFailureMetric;
 import static 
org.apache.pulsar.broker.authentication.oidc.ConfigUtils.getConfigValueAsInt;
 import com.auth0.jwk.Jwk;
 import com.fasterxml.jackson.databind.ObjectMapper;
@@ -49,6 +49,7 @@ import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.TimeUnit;
 import javax.naming.AuthenticationException;
 import org.apache.pulsar.broker.ServiceConfiguration;
+import org.apache.pulsar.broker.authentication.AuthenticationProvider;
 import org.asynchttpclient.AsyncHttpClient;
 
 public class JwksCache {
@@ -60,8 +61,11 @@ public class JwksCache {
     private final ObjectReader reader = new 
ObjectMapper().readerFor(HashMap.class);
     private final AsyncHttpClient httpClient;
     private final OpenidApi openidApi;
+    private final AuthenticationProvider authenticationProvider;
 
-    JwksCache(ServiceConfiguration config, AsyncHttpClient httpClient, 
ApiClient apiClient) throws IOException {
+    JwksCache(AuthenticationProvider authenticationProvider, 
ServiceConfiguration config,
+              AsyncHttpClient httpClient, ApiClient apiClient) throws 
IOException {
+        this.authenticationProvider = authenticationProvider;
         // Store the clients
         this.httpClient = httpClient;
         this.openidApi = apiClient != null ? new OpenidApi(apiClient) : null;
@@ -91,7 +95,7 @@ public class JwksCache {
 
     CompletableFuture<Jwk> getJwk(String jwksUri, String keyId) {
         if (jwksUri == null) {
-            
incrementFailureMetric(AuthenticationExceptionCode.ERROR_RETRIEVING_PUBLIC_KEY);
+            
authenticationProvider.incrementFailureMetric(ERROR_RETRIEVING_PUBLIC_KEY);
             return CompletableFuture.failedFuture(new 
IllegalArgumentException("jwksUri must not be null."));
         }
         return getJwkAndMaybeReload(Optional.of(jwksUri), keyId, false);
@@ -139,10 +143,10 @@ public class JwksCache {
                                 
reader.readValue(result.getResponseBodyAsBytes());
                         future.complete(convertToJwks(jwksUri, jwks));
                     } catch (AuthenticationException e) {
-                        
incrementFailureMetric(AuthenticationExceptionCode.ERROR_RETRIEVING_PUBLIC_KEY);
+                        
authenticationProvider.incrementFailureMetric(ERROR_RETRIEVING_PUBLIC_KEY);
                         future.completeExceptionally(e);
                     } catch (Exception e) {
-                        
incrementFailureMetric(AuthenticationExceptionCode.ERROR_RETRIEVING_PUBLIC_KEY);
+                        
authenticationProvider.incrementFailureMetric(ERROR_RETRIEVING_PUBLIC_KEY);
                         future.completeExceptionally(new 
AuthenticationException(
                                 "Error retrieving public key at " + jwksUri + 
": " + e.getMessage()));
                     }
@@ -152,7 +156,7 @@ public class JwksCache {
 
     CompletableFuture<Jwk> getJwkFromKubernetesApiServer(String keyId) {
         if (openidApi == null) {
-            
incrementFailureMetric(AuthenticationExceptionCode.ERROR_RETRIEVING_PUBLIC_KEY);
+            
authenticationProvider.incrementFailureMetric(ERROR_RETRIEVING_PUBLIC_KEY);
             return CompletableFuture.failedFuture(new AuthenticationException(
                     "Failed to retrieve public key from Kubernetes API server: 
Kubernetes fallback is not enabled."));
         }
@@ -165,7 +169,7 @@ public class JwksCache {
             openidApi.getServiceAccountIssuerOpenIDKeysetAsync(new 
ApiCallback<String>() {
                 @Override
                 public void onFailure(ApiException e, int statusCode, 
Map<String, List<String>> responseHeaders) {
-                    
incrementFailureMetric(AuthenticationExceptionCode.ERROR_RETRIEVING_PUBLIC_KEY);
+                    
authenticationProvider.incrementFailureMetric(ERROR_RETRIEVING_PUBLIC_KEY);
                     // We want the message and responseBody here: 
https://github.com/kubernetes-client/java/issues/2066.
                     future.completeExceptionally(
                             new AuthenticationException("Failed to retrieve 
public key from Kubernetes API server. "
@@ -178,10 +182,10 @@ public class JwksCache {
                         HashMap<String, Object> jwks = 
reader.readValue(result);
                         future.complete(convertToJwks("Kubernetes API server", 
jwks));
                     } catch (AuthenticationException e) {
-                        
incrementFailureMetric(AuthenticationExceptionCode.ERROR_RETRIEVING_PUBLIC_KEY);
+                        
authenticationProvider.incrementFailureMetric(ERROR_RETRIEVING_PUBLIC_KEY);
                         future.completeExceptionally(e);
                     } catch (Exception e) {
-                        
incrementFailureMetric(AuthenticationExceptionCode.ERROR_RETRIEVING_PUBLIC_KEY);
+                        
authenticationProvider.incrementFailureMetric(ERROR_RETRIEVING_PUBLIC_KEY);
                         future.completeExceptionally(new 
AuthenticationException(
                                 "Error retrieving public key at Kubernetes API 
server: " + e.getMessage()));
                     }
@@ -198,7 +202,7 @@ public class JwksCache {
                 }
             });
         } catch (ApiException e) {
-            
incrementFailureMetric(AuthenticationExceptionCode.ERROR_RETRIEVING_PUBLIC_KEY);
+            
authenticationProvider.incrementFailureMetric(ERROR_RETRIEVING_PUBLIC_KEY);
             future.completeExceptionally(
                     new AuthenticationException("Failed to retrieve public key 
from Kubernetes API server: "
                             + e.getMessage()));
@@ -212,7 +216,7 @@ public class JwksCache {
                 return jwk;
             }
         }
-        
incrementFailureMetric(AuthenticationExceptionCode.ERROR_RETRIEVING_PUBLIC_KEY);
+        
authenticationProvider.incrementFailureMetric(ERROR_RETRIEVING_PUBLIC_KEY);
         throw new IllegalArgumentException("No JWK found for Key ID " + keyId);
     }
 
diff --git 
a/pulsar-broker-auth-oidc/src/main/java/org/apache/pulsar/broker/authentication/oidc/OpenIDProviderMetadataCache.java
 
b/pulsar-broker-auth-oidc/src/main/java/org/apache/pulsar/broker/authentication/oidc/OpenIDProviderMetadataCache.java
index 111399adbd7..cffa52b00aa 100644
--- 
a/pulsar-broker-auth-oidc/src/main/java/org/apache/pulsar/broker/authentication/oidc/OpenIDProviderMetadataCache.java
+++ 
b/pulsar-broker-auth-oidc/src/main/java/org/apache/pulsar/broker/authentication/oidc/OpenIDProviderMetadataCache.java
@@ -18,13 +18,13 @@
  */
 package org.apache.pulsar.broker.authentication.oidc;
 
+import static 
org.apache.pulsar.broker.authentication.oidc.AuthenticationExceptionCode.ERROR_RETRIEVING_PROVIDER_METADATA;
 import static 
org.apache.pulsar.broker.authentication.oidc.AuthenticationProviderOpenID.CACHE_EXPIRATION_SECONDS;
 import static 
org.apache.pulsar.broker.authentication.oidc.AuthenticationProviderOpenID.CACHE_EXPIRATION_SECONDS_DEFAULT;
 import static 
org.apache.pulsar.broker.authentication.oidc.AuthenticationProviderOpenID.CACHE_REFRESH_AFTER_WRITE_SECONDS;
 import static 
org.apache.pulsar.broker.authentication.oidc.AuthenticationProviderOpenID.CACHE_REFRESH_AFTER_WRITE_SECONDS_DEFAULT;
 import static 
org.apache.pulsar.broker.authentication.oidc.AuthenticationProviderOpenID.CACHE_SIZE;
 import static 
org.apache.pulsar.broker.authentication.oidc.AuthenticationProviderOpenID.CACHE_SIZE_DEFAULT;
-import static 
org.apache.pulsar.broker.authentication.oidc.AuthenticationProviderOpenID.incrementFailureMetric;
 import static 
org.apache.pulsar.broker.authentication.oidc.ConfigUtils.getConfigValueAsInt;
 import com.fasterxml.jackson.databind.ObjectMapper;
 import com.fasterxml.jackson.databind.ObjectReader;
@@ -43,6 +43,7 @@ import java.util.concurrent.TimeUnit;
 import javax.annotation.Nonnull;
 import javax.naming.AuthenticationException;
 import org.apache.pulsar.broker.ServiceConfiguration;
+import org.apache.pulsar.broker.authentication.AuthenticationProvider;
 import org.asynchttpclient.AsyncHttpClient;
 
 /**
@@ -51,13 +52,16 @@ import org.asynchttpclient.AsyncHttpClient;
 class OpenIDProviderMetadataCache {
 
     private final ObjectReader reader = new 
ObjectMapper().readerFor(OpenIDProviderMetadata.class);
+    private final AuthenticationProvider authenticationProvider;
     private final AsyncHttpClient httpClient;
     private final WellKnownApi wellKnownApi;
     private final AsyncLoadingCache<Optional<String>, OpenIDProviderMetadata> 
cache;
     private static final String WELL_KNOWN_OPENID_CONFIG = 
".well-known/openid-configuration";
     private static final String SLASH_WELL_KNOWN_OPENID_CONFIG = "/" + 
WELL_KNOWN_OPENID_CONFIG;
 
-    OpenIDProviderMetadataCache(ServiceConfiguration config, AsyncHttpClient 
httpClient, ApiClient apiClient) {
+    OpenIDProviderMetadataCache(AuthenticationProvider authenticationProvider, 
ServiceConfiguration config,
+                                AsyncHttpClient httpClient, ApiClient 
apiClient) {
+        this.authenticationProvider = authenticationProvider;
         int maxSize = getConfigValueAsInt(config, CACHE_SIZE, 
CACHE_SIZE_DEFAULT);
         int refreshAfterWriteSeconds = getConfigValueAsInt(config, 
CACHE_REFRESH_AFTER_WRITE_SECONDS,
                 CACHE_REFRESH_AFTER_WRITE_SECONDS_DEFAULT);
@@ -124,10 +128,10 @@ class OpenIDProviderMetadataCache {
                         verifyIssuer(issuer, openIDProviderMetadata, false);
                         future.complete(openIDProviderMetadata);
                     } catch (AuthenticationException e) {
-                        
incrementFailureMetric(AuthenticationExceptionCode.ERROR_RETRIEVING_PROVIDER_METADATA);
+                        
authenticationProvider.incrementFailureMetric(ERROR_RETRIEVING_PROVIDER_METADATA);
                         future.completeExceptionally(e);
                     } catch (Exception e) {
-                        
incrementFailureMetric(AuthenticationExceptionCode.ERROR_RETRIEVING_PROVIDER_METADATA);
+                        
authenticationProvider.incrementFailureMetric(ERROR_RETRIEVING_PROVIDER_METADATA);
                         future.completeExceptionally(new 
AuthenticationException(
                                 "Error retrieving OpenID Provider Metadata at 
" + issuer + ": " + e.getMessage()));
                     }
@@ -151,7 +155,7 @@ class OpenIDProviderMetadataCache {
                 verifyIssuer(issClaim, openIDProviderMetadata, true);
                 future.complete(openIDProviderMetadata);
             } catch (AuthenticationException e) {
-                
incrementFailureMetric(AuthenticationExceptionCode.ERROR_RETRIEVING_PROVIDER_METADATA);
+                
authenticationProvider.incrementFailureMetric(ERROR_RETRIEVING_PROVIDER_METADATA);
                 future.completeExceptionally(e);
             }
             return future;
@@ -164,7 +168,7 @@ class OpenIDProviderMetadataCache {
             wellKnownApi.getServiceAccountIssuerOpenIDConfigurationAsync(new 
ApiCallback<>() {
                 @Override
                 public void onFailure(ApiException e, int statusCode, 
Map<String, List<String>> responseHeaders) {
-                    
incrementFailureMetric(AuthenticationExceptionCode.ERROR_RETRIEVING_PROVIDER_METADATA);
+                    
authenticationProvider.incrementFailureMetric(ERROR_RETRIEVING_PROVIDER_METADATA);
                     // We want the message and responseBody here: 
https://github.com/kubernetes-client/java/issues/2066.
                     future.completeExceptionally(new AuthenticationException(
                             "Error retrieving OpenID Provider Metadata from 
Kubernetes API server. Message: "
@@ -179,7 +183,7 @@ class OpenIDProviderMetadataCache {
                         OpenIDProviderMetadata openIDProviderMetadata = 
reader.readValue(result);
                         future.complete(openIDProviderMetadata);
                     } catch (Exception e) {
-                        
incrementFailureMetric(AuthenticationExceptionCode.ERROR_RETRIEVING_PROVIDER_METADATA);
+                        
authenticationProvider.incrementFailureMetric(ERROR_RETRIEVING_PROVIDER_METADATA);
                         future.completeExceptionally(new 
AuthenticationException(
                                 "Error retrieving OpenID Provider Metadata 
from Kubernetes API Server: "
                                         + e.getMessage()));
@@ -197,7 +201,7 @@ class OpenIDProviderMetadataCache {
                 }
             });
         } catch (ApiException e) {
-            
incrementFailureMetric(AuthenticationExceptionCode.ERROR_RETRIEVING_PROVIDER_METADATA);
+            
authenticationProvider.incrementFailureMetric(ERROR_RETRIEVING_PROVIDER_METADATA);
             future.completeExceptionally(new AuthenticationException(
                     "Error retrieving OpenID Provider Metadata from Kubernetes 
API server: " + e.getMessage()));
         }
@@ -221,10 +225,10 @@ class OpenIDProviderMetadataCache {
                               boolean isK8s) throws AuthenticationException {
         if (!issuer.equals(metadata.getIssuer())) {
             if (isK8s) {
-                
incrementFailureMetric(AuthenticationExceptionCode.UNSUPPORTED_ISSUER);
+                
authenticationProvider.incrementFailureMetric(AuthenticationExceptionCode.UNSUPPORTED_ISSUER);
                 throw new AuthenticationException("Issuer not allowed: " + 
issuer);
             } else {
-                
incrementFailureMetric(AuthenticationExceptionCode.ISSUER_MISMATCH);
+                
authenticationProvider.incrementFailureMetric(AuthenticationExceptionCode.ISSUER_MISMATCH);
                 throw new AuthenticationException(String.format("Issuer URL 
mismatch: [%s] should match [%s]",
                         issuer, metadata.getIssuer()));
             }
diff --git 
a/pulsar-broker-auth-oidc/src/test/java/org/apache/pulsar/broker/authentication/oidc/AuthenticationProviderOpenIDIntegrationTest.java
 
b/pulsar-broker-auth-oidc/src/test/java/org/apache/pulsar/broker/authentication/oidc/AuthenticationProviderOpenIDIntegrationTest.java
index e11fd8395a5..f4663a9ee3c 100644
--- 
a/pulsar-broker-auth-oidc/src/test/java/org/apache/pulsar/broker/authentication/oidc/AuthenticationProviderOpenIDIntegrationTest.java
+++ 
b/pulsar-broker-auth-oidc/src/test/java/org/apache/pulsar/broker/authentication/oidc/AuthenticationProviderOpenIDIntegrationTest.java
@@ -260,7 +260,7 @@ public class AuthenticationProviderOpenIDIntegrationTest {
         Files.write(Path.of(System.getenv("KUBECONFIG")), 
kubeConfig.getBytes());
 
         provider = new AuthenticationProviderOpenID();
-        provider.initialize(conf);
+        
provider.initialize(AuthenticationProvider.Context.builder().config(conf).build());
     }
 
     @AfterClass
@@ -358,7 +358,7 @@ public class AuthenticationProviderOpenIDIntegrationTest {
 
         @Cleanup
         AuthenticationProviderOpenID provider = new 
AuthenticationProviderOpenID();
-        provider.initialize(conf);
+        
provider.initialize(AuthenticationProvider.Context.builder().config(conf).build());
 
         String role = "superuser";
         String token = generateToken(validJwk, issuerWithMissingKid, role, 
"allowed-audience", 0L, 0L, 10000L);
@@ -379,7 +379,7 @@ public class AuthenticationProviderOpenIDIntegrationTest {
 
         @Cleanup
         AuthenticationProviderOpenID provider = new 
AuthenticationProviderOpenID();
-        provider.initialize(conf);
+        
provider.initialize(AuthenticationProvider.Context.builder().config(conf).build());
 
         String role = "superuser";
         String token = generateToken(validJwk, issuerWithMissingKid, role, 
"allowed-audience", 0L, 0L, 10000L);
@@ -407,7 +407,7 @@ public class AuthenticationProviderOpenIDIntegrationTest {
 
         @Cleanup
         AuthenticationProviderOpenID provider = new 
AuthenticationProviderOpenID();
-        provider.initialize(conf);
+        
provider.initialize(AuthenticationProvider.Context.builder().config(conf).build());
 
         String role = "superuser";
         // We use the normal issuer on the token because the /k8s endpoint is 
configured via the kube config file
@@ -441,7 +441,7 @@ public class AuthenticationProviderOpenIDIntegrationTest {
 
         @Cleanup
         AuthenticationProviderOpenID provider = new 
AuthenticationProviderOpenID();
-        provider.initialize(conf);
+        
provider.initialize(AuthenticationProvider.Context.builder().config(conf).build());
 
         String role = "superuser";
         String token = generateToken(validJwk, "http://not-the-k8s-issuer";, 
role, "allowed-audience", 0L, 0L, 10000L);
@@ -468,7 +468,7 @@ public class AuthenticationProviderOpenIDIntegrationTest {
 
         @Cleanup
         AuthenticationProviderOpenID provider = new 
AuthenticationProviderOpenID();
-        provider.initialize(conf);
+        
provider.initialize(AuthenticationProvider.Context.builder().config(conf).build());
 
         String role = "superuser";
         String token = generateToken(validJwk, issuer, role, 
"allowed-audience", 0L, 0L, 10000L);
@@ -499,7 +499,7 @@ public class AuthenticationProviderOpenIDIntegrationTest {
 
         @Cleanup
         AuthenticationProviderOpenID provider = new 
AuthenticationProviderOpenID();
-        provider.initialize(conf);
+        
provider.initialize(AuthenticationProvider.Context.builder().config(conf).build());
 
         String role = "superuser";
         String token = generateToken(validJwk, "http://not-the-k8s-issuer";, 
role, "allowed-audience", 0L, 0L, 10000L);
@@ -562,7 +562,7 @@ public class AuthenticationProviderOpenIDIntegrationTest {
         
props.setProperty(AuthenticationProviderOpenID.ACCEPTED_TIME_LEEWAY_SECONDS, 
"10");
         @Cleanup
         AuthenticationProviderOpenID provider = new 
AuthenticationProviderOpenID();
-        provider.initialize(conf);
+        
provider.initialize(AuthenticationProvider.Context.builder().config(conf).build());
 
         String role = "superuser";
         String token = generateToken(validJwk, issuer, role, 
"allowed-audience", 0L, 0L, 0L);
@@ -635,7 +635,7 @@ public class AuthenticationProviderOpenIDIntegrationTest {
         
props.setProperty(AuthenticationProviderOpenID.ISSUER_TRUST_CERTS_FILE_PATH, 
caCert);
         ServiceConfiguration config = new ServiceConfiguration();
         config.setProperties(props);
-        provider.initialize(config);
+        
provider.initialize(AuthenticationProvider.Context.builder().config(config).build());
 
         // Build a JWT with a custom claim
         HashMap<String, Object> claims = new HashMap();
@@ -656,7 +656,7 @@ public class AuthenticationProviderOpenIDIntegrationTest {
         
props.setProperty(AuthenticationProviderOpenID.ISSUER_TRUST_CERTS_FILE_PATH, 
caCert);
         ServiceConfiguration config = new ServiceConfiguration();
         config.setProperties(props);
-        provider.initialize(config);
+        
provider.initialize(AuthenticationProvider.Context.builder().config(config).build());
 
         // Build a JWT without the "test" claim, which should cause the 
authentication to fail
         String token = generateToken(validJwk, issuer, "not-my-role", 
"allowed-audience", 0L,
diff --git 
a/pulsar-broker-auth-oidc/src/test/java/org/apache/pulsar/broker/authentication/oidc/AuthenticationProviderOpenIDTest.java
 
b/pulsar-broker-auth-oidc/src/test/java/org/apache/pulsar/broker/authentication/oidc/AuthenticationProviderOpenIDTest.java
index f5bb584d16f..4a12f61528a 100644
--- 
a/pulsar-broker-auth-oidc/src/test/java/org/apache/pulsar/broker/authentication/oidc/AuthenticationProviderOpenIDTest.java
+++ 
b/pulsar-broker-auth-oidc/src/test/java/org/apache/pulsar/broker/authentication/oidc/AuthenticationProviderOpenIDTest.java
@@ -18,6 +18,7 @@
  */
 package org.apache.pulsar.broker.authentication.oidc;
 
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
 import static org.testng.Assert.assertNull;
 import com.auth0.jwt.JWT;
 import com.auth0.jwt.interfaces.DecodedJWT;
@@ -29,9 +30,9 @@ import java.security.KeyPair;
 import java.sql.Date;
 import java.time.Instant;
 import java.util.Arrays;
+import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
-import java.util.HashSet;
 import java.util.List;
 import java.util.Properties;
 import java.util.Set;
@@ -39,6 +40,7 @@ import javax.naming.AuthenticationException;
 import lombok.Cleanup;
 import org.apache.pulsar.broker.ServiceConfiguration;
 import org.apache.pulsar.broker.authentication.AuthenticationDataCommand;
+import org.apache.pulsar.broker.authentication.AuthenticationProvider;
 import org.testng.Assert;
 import org.testng.annotations.AfterClass;
 import org.testng.annotations.BeforeClass;
@@ -59,16 +61,31 @@ import org.testng.annotations.Test;
 public class AuthenticationProviderOpenIDTest {
 
     // https://www.rfc-editor.org/rfc/rfc7518#section-3.1
+    private static final Set<SignatureAlgorithm> SUPPORTED_ALGORITHMS = Set.of(
+            SignatureAlgorithm.RS256,
+            SignatureAlgorithm.RS384,
+            SignatureAlgorithm.RS512,
+            SignatureAlgorithm.ES256,
+            SignatureAlgorithm.ES384,
+            SignatureAlgorithm.ES512
+    );
+
     @DataProvider(name = "supportedAlgorithms")
     public static Object[][] supportedAlgorithms() {
-        return new Object[][] {
-                { SignatureAlgorithm.RS256 },
-                { SignatureAlgorithm.RS384 },
-                { SignatureAlgorithm.RS512 },
-                { SignatureAlgorithm.ES256 },
-                { SignatureAlgorithm.ES384 },
-                { SignatureAlgorithm.ES512 }
-        };
+        return buildDataProvider(SUPPORTED_ALGORITHMS);
+    }
+
+    @DataProvider(name = "unsupportedAlgorithms")
+    public static Object[][] unsupportedAlgorithms() {
+        var unsupportedAlgorithms = Set.of(SignatureAlgorithm.values())
+                .stream()
+                .filter(alg -> !SUPPORTED_ALGORITHMS.contains(alg))
+                .toList();
+        return buildDataProvider(unsupportedAlgorithms);
+    }
+
+    private static Object[][] buildDataProvider(Collection<?> collection) {
+        return collection.stream().map(o -> new Object[] { o 
}).toArray(Object[][]::new);
     }
 
     // Provider to use in common tests that are not verifying the 
configuration of the provider itself.
@@ -83,7 +100,7 @@ public class AuthenticationProviderOpenIDTest {
         ServiceConfiguration conf = new ServiceConfiguration();
         conf.setProperties(properties);
         basicProvider = new AuthenticationProviderOpenID();
-        basicProvider.initialize(conf);
+        
basicProvider.initialize(AuthenticationProvider.Context.builder().config(conf).build());
     }
 
     @AfterClass
@@ -100,29 +117,19 @@ public class AuthenticationProviderOpenIDTest {
     }
 
     @Test
-    public void testThatNullAlgFails() throws IOException {
-        @Cleanup
-        AuthenticationProviderOpenID provider = new 
AuthenticationProviderOpenID();
-        Assert.assertThrows(AuthenticationException.class,
-                () -> provider.verifyJWT(null, null, null));
+    public void testThatNullAlgFails() {
+        assertThatThrownBy(() -> basicProvider.verifyJWT(null, null, null))
+                .isInstanceOf(AuthenticationException.class)
+                .hasMessage("PublicKey algorithm cannot be null");
     }
 
-    @Test
-    public void testThatUnsupportedAlgsThrowExceptions() {
-        Set<SignatureAlgorithm> unsupportedAlgs = new 
HashSet<>(Set.of(SignatureAlgorithm.values()));
-        Arrays.stream(supportedAlgorithms()).map(o -> (SignatureAlgorithm) 
o[0]).toList()
-                .forEach(unsupportedAlgs::remove);
-        unsupportedAlgs.forEach(unsupportedAlg -> {
-            try {
-                @Cleanup
-                AuthenticationProviderOpenID provider = new 
AuthenticationProviderOpenID();
-                // We don't create a public key because it's irrelevant
-                Assert.assertThrows(AuthenticationException.class,
-                        () -> provider.verifyJWT(null, 
unsupportedAlg.getValue(), null));
-            } catch (Exception e) {
-                throw new RuntimeException(e);
-            }
-        });
+    @Test(dataProvider = "unsupportedAlgorithms")
+    public void testThatUnsupportedAlgsThrowExceptions(SignatureAlgorithm 
unsupportedAlg) {
+        var algorithm = unsupportedAlg.getValue();
+        // We don't create a public key because it's irrelevant
+        assertThatThrownBy(() -> basicProvider.verifyJWT(null, algorithm, 
null))
+                .isInstanceOf(AuthenticationException.class)
+                .hasMessage("Unsupported algorithm: " + algorithm);
     }
 
     @Test(dataProvider = "supportedAlgorithms")
@@ -141,29 +148,27 @@ public class AuthenticationProviderOpenIDTest {
     @Test
     public void 
testThatSupportedAlgWithMismatchedPublicKeyFromDifferentAlgFamilyFails() throws 
IOException {
         KeyPair keyPair = Keys.keyPairFor(SignatureAlgorithm.RS256);
-        @Cleanup
-        AuthenticationProviderOpenID provider = new 
AuthenticationProviderOpenID();
         DefaultJwtBuilder defaultJwtBuilder = new DefaultJwtBuilder();
         addValidMandatoryClaims(defaultJwtBuilder, basicProviderAudience);
         defaultJwtBuilder.signWith(keyPair.getPrivate());
         DecodedJWT jwt = JWT.decode(defaultJwtBuilder.compact());
         // Choose a different algorithm from a different alg family
-        Assert.assertThrows(AuthenticationException.class,
-                () -> provider.verifyJWT(keyPair.getPublic(), 
SignatureAlgorithm.ES512.getValue(), jwt));
+        assertThatThrownBy(() -> basicProvider.verifyJWT(keyPair.getPublic(), 
SignatureAlgorithm.ES512.getValue(), jwt))
+                .isInstanceOf(AuthenticationException.class)
+                .hasMessage("Expected PublicKey alg [ES512] does match actual 
alg.");
     }
 
     @Test
-    public void 
testThatSupportedAlgWithMismatchedPublicKeyFromSameAlgFamilyFails() throws 
IOException {
+    public void 
testThatSupportedAlgWithMismatchedPublicKeyFromSameAlgFamilyFails() {
         KeyPair keyPair = Keys.keyPairFor(SignatureAlgorithm.RS256);
-        @Cleanup
-        AuthenticationProviderOpenID provider = new 
AuthenticationProviderOpenID();
         DefaultJwtBuilder defaultJwtBuilder = new DefaultJwtBuilder();
         addValidMandatoryClaims(defaultJwtBuilder, basicProviderAudience);
         defaultJwtBuilder.signWith(keyPair.getPrivate());
         DecodedJWT jwt = JWT.decode(defaultJwtBuilder.compact());
         // Choose a different algorithm but within the same alg family as above
-        Assert.assertThrows(AuthenticationException.class,
-                () -> provider.verifyJWT(keyPair.getPublic(), 
SignatureAlgorithm.RS512.getValue(), jwt));
+        assertThatThrownBy(() -> basicProvider.verifyJWT(keyPair.getPublic(), 
SignatureAlgorithm.RS512.getValue(), jwt))
+                .isInstanceOf(AuthenticationException.class)
+                .hasMessageStartingWith("JWT algorithm does not match Public 
Key algorithm");
     }
 
     @Test
@@ -217,7 +222,7 @@ public class AuthenticationProviderOpenIDTest {
         props.setProperty(AuthenticationProviderOpenID.ALLOWED_TOKEN_ISSUERS, 
"https://localhost:8080";);
         ServiceConfiguration config = new ServiceConfiguration();
         config.setProperties(props);
-        provider.initialize(config);
+        
provider.initialize(AuthenticationProvider.Context.builder().config(config).build());
 
         // Build the JWT with an only recently expired token
         DefaultJwtBuilder defaultJwtBuilder = new DefaultJwtBuilder();
@@ -244,7 +249,8 @@ public class AuthenticationProviderOpenIDTest {
         props.setProperty(AuthenticationProviderOpenID.ALLOWED_TOKEN_ISSUERS, 
"");
         ServiceConfiguration config = new ServiceConfiguration();
         config.setProperties(props);
-        Assert.assertThrows(IllegalArgumentException.class, () -> 
provider.initialize(config));
+        Assert.assertThrows(IllegalArgumentException.class,
+                () -> 
provider.initialize(AuthenticationProvider.Context.builder().config(config).build()));
     }
 
     @Test
@@ -256,7 +262,8 @@ public class AuthenticationProviderOpenIDTest {
         
props.setProperty(AuthenticationProviderOpenID.FALLBACK_DISCOVERY_MODE, 
"DISABLED");
         ServiceConfiguration config = new ServiceConfiguration();
         config.setProperties(props);
-        Assert.assertThrows(IllegalArgumentException.class, () -> 
provider.initialize(config));
+        Assert.assertThrows(IllegalArgumentException.class,
+                () -> 
provider.initialize(AuthenticationProvider.Context.builder().config(config).build()));
     }
 
     @Test
@@ -269,7 +276,7 @@ public class AuthenticationProviderOpenIDTest {
         
props.setProperty(AuthenticationProviderOpenID.FALLBACK_DISCOVERY_MODE, 
"KUBERNETES_DISCOVER_TRUSTED_ISSUER");
         ServiceConfiguration config = new ServiceConfiguration();
         config.setProperties(props);
-        provider.initialize(config);
+        
provider.initialize(AuthenticationProvider.Context.builder().config(config).build());
     }
 
     @Test
@@ -282,7 +289,7 @@ public class AuthenticationProviderOpenIDTest {
         
props.setProperty(AuthenticationProviderOpenID.FALLBACK_DISCOVERY_MODE, 
"KUBERNETES_DISCOVER_PUBLIC_KEYS");
         ServiceConfiguration config = new ServiceConfiguration();
         config.setProperties(props);
-        provider.initialize(config);
+        
provider.initialize(AuthenticationProvider.Context.builder().config(config).build());
     }
 
     @Test
@@ -292,7 +299,8 @@ public class AuthenticationProviderOpenIDTest {
         ServiceConfiguration config = new ServiceConfiguration();
         // Make sure this still defaults to null.
         
assertNull(config.getProperties().get(AuthenticationProviderOpenID.ALLOWED_TOKEN_ISSUERS));
-        Assert.assertThrows(IllegalArgumentException.class, () -> 
provider.initialize(config));
+        Assert.assertThrows(IllegalArgumentException.class,
+                () -> 
provider.initialize(AuthenticationProvider.Context.builder().config(config).build()));
     }
 
     @Test
@@ -303,7 +311,8 @@ public class AuthenticationProviderOpenIDTest {
         props.setProperty(AuthenticationProviderOpenID.ALLOWED_TOKEN_ISSUERS, 
"https://myissuer.com,http://myissuer.com";);
         ServiceConfiguration config = new ServiceConfiguration();
         config.setProperties(props);
-        Assert.assertThrows(IllegalArgumentException.class, () -> 
provider.initialize(config));
+        Assert.assertThrows(IllegalArgumentException.class,
+                () -> 
provider.initialize(AuthenticationProvider.Context.builder().config(config).build()));
     }
 
     @Test void ensureMissingRoleClaimReturnsNull() throws Exception {
@@ -325,7 +334,7 @@ public class AuthenticationProviderOpenIDTest {
         props.setProperty(AuthenticationProviderOpenID.ROLE_CLAIM, "sub");
         ServiceConfiguration config = new ServiceConfiguration();
         config.setProperties(props);
-        provider.initialize(config);
+        
provider.initialize(AuthenticationProvider.Context.builder().config(config).build());
 
         // Build an empty JWT
         DefaultJwtBuilder defaultJwtBuilder = new DefaultJwtBuilder();
@@ -345,7 +354,7 @@ public class AuthenticationProviderOpenIDTest {
         props.setProperty(AuthenticationProviderOpenID.ROLE_CLAIM, "roles");
         ServiceConfiguration config = new ServiceConfiguration();
         config.setProperties(props);
-        provider.initialize(config);
+        
provider.initialize(AuthenticationProvider.Context.builder().config(config).build());
 
         // Build an empty JWT
         DefaultJwtBuilder defaultJwtBuilder = new DefaultJwtBuilder();
@@ -367,7 +376,7 @@ public class AuthenticationProviderOpenIDTest {
         props.setProperty(AuthenticationProviderOpenID.ROLE_CLAIM, "roles");
         ServiceConfiguration config = new ServiceConfiguration();
         config.setProperties(props);
-        provider.initialize(config);
+        
provider.initialize(AuthenticationProvider.Context.builder().config(config).build());
 
         // Build an empty JWT
         DefaultJwtBuilder defaultJwtBuilder = new DefaultJwtBuilder();
@@ -389,7 +398,7 @@ public class AuthenticationProviderOpenIDTest {
         props.setProperty(AuthenticationProviderOpenID.ROLE_CLAIM, "roles");
         ServiceConfiguration config = new ServiceConfiguration();
         config.setProperties(props);
-        provider.initialize(config);
+        
provider.initialize(AuthenticationProvider.Context.builder().config(config).build());
 
         // Build an empty JWT
         DefaultJwtBuilder defaultJwtBuilder = new DefaultJwtBuilder();
diff --git 
a/pulsar-broker-auth-sasl/src/main/java/org/apache/pulsar/broker/authentication/AuthenticationProviderSasl.java
 
b/pulsar-broker-auth-sasl/src/main/java/org/apache/pulsar/broker/authentication/AuthenticationProviderSasl.java
index 2616f90c664..f8841193ba2 100644
--- 
a/pulsar-broker-auth-sasl/src/main/java/org/apache/pulsar/broker/authentication/AuthenticationProviderSasl.java
+++ 
b/pulsar-broker-auth-sasl/src/main/java/org/apache/pulsar/broker/authentication/AuthenticationProviderSasl.java
@@ -78,6 +78,12 @@ public class AuthenticationProviderSasl implements 
AuthenticationProvider {
 
     @Override
     public void initialize(ServiceConfiguration config) throws IOException {
+        initialize(Context.builder().config(config).build());
+    }
+
+    @Override
+    public void initialize(Context context) throws IOException {
+        var config = context.getConfig();
         this.configuration = new HashMap<>();
         final String allowedIdsPatternRegExp = 
config.getSaslJaasClientAllowedIds();
         configuration.put(JAAS_CLIENT_ALLOWED_IDS, allowedIdsPatternRegExp);
diff --git 
a/pulsar-broker-auth-sasl/src/test/java/org/apache/pulsar/broker/authentication/SaslAuthenticateTest.java
 
b/pulsar-broker-auth-sasl/src/test/java/org/apache/pulsar/broker/authentication/SaslAuthenticateTest.java
index ae282a49dc3..226ec15d33a 100644
--- 
a/pulsar-broker-auth-sasl/src/test/java/org/apache/pulsar/broker/authentication/SaslAuthenticateTest.java
+++ 
b/pulsar-broker-auth-sasl/src/test/java/org/apache/pulsar/broker/authentication/SaslAuthenticateTest.java
@@ -312,7 +312,7 @@ public class SaslAuthenticateTest extends 
ProducerConsumerBase {
         AuthenticationProviderSasl saslServer = new 
AuthenticationProviderSasl();
         // The cache expiration time is set to 50ms. Residual auth info should 
be cleaned up
         conf.setInflightSaslContextExpiryMs(50);
-        saslServer.initialize(conf);
+        
saslServer.initialize(AuthenticationProvider.Context.builder().config(conf).build());
 
         HttpServletRequest servletRequest = mock(HttpServletRequest.class);
         doReturn("Init").when(servletRequest).getHeader("State");
@@ -360,7 +360,7 @@ public class SaslAuthenticateTest extends 
ProducerConsumerBase {
         doReturn("Init").when(servletRequest).getHeader("State");
         conf.setInflightSaslContextExpiryMs(Integer.MAX_VALUE);
         conf.setMaxInflightSaslContext(1);
-        saslServer.initialize(conf);
+        
saslServer.initialize(AuthenticationProvider.Context.builder().config(conf).build());
         // add 10 inflight sasl context
         for (int i = 0; i < 10; i++) {
             AuthenticationDataProvider dataProvider =  
authSasl.getAuthData("localhost");
diff --git a/pulsar-broker-common/pom.xml b/pulsar-broker-common/pom.xml
index 713ae538d7d..b04d08c6c8f 100644
--- a/pulsar-broker-common/pom.xml
+++ b/pulsar-broker-common/pom.xml
@@ -49,6 +49,11 @@
       <artifactId>simpleclient_jetty</artifactId>
     </dependency>
 
+    <dependency>
+      <groupId>io.opentelemetry</groupId>
+      <artifactId>opentelemetry-api</artifactId>
+    </dependency>
+
     <dependency>
       <groupId>javax.servlet</groupId>
       <artifactId>javax.servlet-api</artifactId>
diff --git 
a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authentication/AuthenticationProvider.java
 
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authentication/AuthenticationProvider.java
index 7862a35b5e8..d0a3a487b34 100644
--- 
a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authentication/AuthenticationProvider.java
+++ 
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authentication/AuthenticationProvider.java
@@ -20,6 +20,7 @@ package org.apache.pulsar.broker.authentication;
 
 import static 
org.apache.pulsar.broker.web.AuthenticationFilter.AuthenticatedDataAttributeName;
 import static 
org.apache.pulsar.broker.web.AuthenticationFilter.AuthenticatedRoleAttributeName;
+import io.opentelemetry.api.OpenTelemetry;
 import java.io.Closeable;
 import java.io.IOException;
 import java.net.SocketAddress;
@@ -29,6 +30,8 @@ import javax.naming.AuthenticationException;
 import javax.net.ssl.SSLSession;
 import javax.servlet.http.HttpServletRequest;
 import javax.servlet.http.HttpServletResponse;
+import lombok.Builder;
+import lombok.Value;
 import org.apache.pulsar.broker.ServiceConfiguration;
 import org.apache.pulsar.broker.authentication.metrics.AuthenticationMetrics;
 import org.apache.pulsar.common.api.AuthData;
@@ -47,8 +50,30 @@ public interface AuthenticationProvider extends Closeable {
      * @throws IOException
      *             if the initialization fails
      */
+    @Deprecated(since = "3.4.0")
     void initialize(ServiceConfiguration config) throws IOException;
 
+    @Builder
+    @Value
+    class Context {
+        ServiceConfiguration config;
+
+        @Builder.Default
+        OpenTelemetry openTelemetry = OpenTelemetry.noop();
+    }
+
+    /**
+     * Perform initialization for the authentication provider.
+     *
+     * @param context
+     *            the authentication provider context
+     * @throws IOException
+     *             if the initialization fails
+     */
+    default void initialize(Context context) throws IOException {
+        initialize(context.getConfig());
+    }
+
     /**
      * @return the authentication method name supported by this provider
      */
diff --git 
a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authentication/AuthenticationProviderBasic.java
 
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authentication/AuthenticationProviderBasic.java
index ca5150c9bdb..91bf56a071c 100644
--- 
a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authentication/AuthenticationProviderBasic.java
+++ 
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authentication/AuthenticationProviderBasic.java
@@ -46,6 +46,8 @@ public class AuthenticationProviderBasic implements 
AuthenticationProvider {
     private static final String CONF_PULSAR_PROPERTY_KEY = "basicAuthConf";
     private Map<String, String> users;
 
+    private AuthenticationMetrics authenticationMetrics;
+
     private enum ErrorCode {
         UNKNOWN,
         EMPTY_AUTH_DATA,
@@ -75,6 +77,14 @@ public class AuthenticationProviderBasic implements 
AuthenticationProvider {
 
     @Override
     public void initialize(ServiceConfiguration config) throws IOException {
+        initialize(Context.builder().config(config).build());
+    }
+
+    @Override
+    public void initialize(Context context) throws IOException {
+        authenticationMetrics = new 
AuthenticationMetrics(context.getOpenTelemetry(),
+                getClass().getSimpleName(), getAuthMethodName());
+        var config = context.getConfig();
         String data = 
config.getProperties().getProperty(CONF_PULSAR_PROPERTY_KEY);
         if (StringUtils.isEmpty(data)) {
             data = System.getProperty(CONF_SYSTEM_PROPERTY_KEY);
@@ -106,6 +116,11 @@ public class AuthenticationProviderBasic implements 
AuthenticationProvider {
         return "basic";
     }
 
+    @Override
+    public void incrementFailureMetric(Enum<?> errorCode) {
+        authenticationMetrics.recordFailure(errorCode);
+    }
+
     @Override
     public String authenticate(AuthenticationDataSource authData) throws 
AuthenticationException {
         AuthParams authParams = new AuthParams(authData);
@@ -138,7 +153,7 @@ public class AuthenticationProviderBasic implements 
AuthenticationProvider {
             incrementFailureMetric(errorCode);
             throw exception;
         }
-        AuthenticationMetrics.authenticateSuccess(getClass().getSimpleName(), 
getAuthMethodName());
+        authenticationMetrics.recordSuccess();
         return userId;
     }
 
diff --git 
a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authentication/AuthenticationProviderList.java
 
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authentication/AuthenticationProviderList.java
index 211f2ea006b..0e5559b3c3a 100644
--- 
a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authentication/AuthenticationProviderList.java
+++ 
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authentication/AuthenticationProviderList.java
@@ -38,6 +38,8 @@ import org.apache.pulsar.common.api.AuthData;
 @Slf4j
 public class AuthenticationProviderList implements AuthenticationProvider {
 
+    private AuthenticationMetrics authenticationMetrics;
+
     private interface AuthProcessor<T, W> {
 
         T apply(W process) throws AuthenticationException;
@@ -49,7 +51,8 @@ public class AuthenticationProviderList implements 
AuthenticationProvider {
         AUTH_REQUIRED,
     }
 
-    static <T, W> T applyAuthProcessor(List<W> processors, AuthProcessor<T, W> 
authFunc)
+    private static <T, W> T applyAuthProcessor(List<W> processors, 
AuthenticationMetrics metrics,
+                                               AuthProcessor<T, W> authFunc)
         throws AuthenticationException {
         AuthenticationException authenticationException = null;
         String errorCode = ErrorCode.UNKNOWN.name();
@@ -67,30 +70,29 @@ public class AuthenticationProviderList implements 
AuthenticationProvider {
         }
 
         if (null == authenticationException) {
-            AuthenticationMetrics.authenticateFailure(
-                    AuthenticationProviderList.class.getSimpleName(),
+            
metrics.recordFailure(AuthenticationProviderList.class.getSimpleName(),
                     "authentication-provider-list", ErrorCode.AUTH_REQUIRED);
             throw new AuthenticationException("Authentication required");
         } else {
-            AuthenticationMetrics.authenticateFailure(
-                    AuthenticationProviderList.class.getSimpleName(),
+            
metrics.recordFailure(AuthenticationProviderList.class.getSimpleName(),
                     "authentication-provider-list", errorCode);
             throw authenticationException;
         }
-
     }
 
     private static class AuthenticationListState implements 
AuthenticationState {
 
         private final List<AuthenticationState> states;
         private volatile AuthenticationState authState;
+        private final AuthenticationMetrics metrics;
 
-        AuthenticationListState(List<AuthenticationState> states) {
+        AuthenticationListState(List<AuthenticationState> states, 
AuthenticationMetrics metrics) {
             if (states == null || states.isEmpty()) {
                 throw new IllegalArgumentException("Authentication state 
requires at least one state");
             }
             this.states = states;
             this.authState = states.get(0);
+            this.metrics = metrics;
         }
 
         private AuthenticationState getAuthState() throws 
AuthenticationException {
@@ -135,8 +137,9 @@ public class AuthenticationProviderList implements 
AuthenticationProvider {
                 if (previousException == null) {
                     previousException = new 
AuthenticationException("Authentication required");
                 }
-                
AuthenticationMetrics.authenticateFailure(AuthenticationProviderList.class.getSimpleName(),
-                        "authentication-provider-list", 
ErrorCode.AUTH_REQUIRED);
+                
metrics.recordFailure(AuthenticationProviderList.class.getSimpleName(),
+                        "authentication-provider-list",
+                        ErrorCode.AUTH_REQUIRED);
                 authChallengeFuture.completeExceptionally(previousException);
                 return;
             }
@@ -166,6 +169,7 @@ public class AuthenticationProviderList implements 
AuthenticationProvider {
         public AuthData authenticate(AuthData authData) throws 
AuthenticationException {
             return applyAuthProcessor(
                 states,
+                metrics,
                 as -> {
                     AuthData ad = as.authenticate(authData);
                     AuthenticationListState.this.authState = as;
@@ -216,8 +220,15 @@ public class AuthenticationProviderList implements 
AuthenticationProvider {
 
     @Override
     public void initialize(ServiceConfiguration config) throws IOException {
+        initialize(Context.builder().config(config).build());
+    }
+
+    @Override
+    public void initialize(Context context) throws IOException {
+        authenticationMetrics = new 
AuthenticationMetrics(context.getOpenTelemetry(),
+                getClass().getSimpleName(), getAuthMethodName());
         for (AuthenticationProvider ap : providers) {
-            ap.initialize(config);
+            ap.initialize(context);
         }
     }
 
@@ -226,6 +237,11 @@ public class AuthenticationProviderList implements 
AuthenticationProvider {
         return providers.get(0).getAuthMethodName();
     }
 
+    @Override
+    public void incrementFailureMetric(Enum<?> errorCode) {
+        authenticationMetrics.recordFailure(errorCode);
+    }
+
     @Override
     public CompletableFuture<String> 
authenticateAsync(AuthenticationDataSource authData) {
         CompletableFuture<String> roleFuture = new CompletableFuture<>();
@@ -241,7 +257,7 @@ public class AuthenticationProviderList implements 
AuthenticationProvider {
             if (previousException == null) {
                 previousException = new 
AuthenticationException("Authentication required");
             }
-            
AuthenticationMetrics.authenticateFailure(AuthenticationProviderList.class.getSimpleName(),
+            
authenticationMetrics.recordFailure(AuthenticationProvider.class.getSimpleName(),
                     "authentication-provider-list", ErrorCode.AUTH_REQUIRED);
             roleFuture.completeExceptionally(previousException);
             return;
@@ -264,6 +280,7 @@ public class AuthenticationProviderList implements 
AuthenticationProvider {
     public String authenticate(AuthenticationDataSource authData) throws 
AuthenticationException {
         return applyAuthProcessor(
             providers,
+            authenticationMetrics,
             provider -> provider.authenticate(authData)
         );
     }
@@ -294,7 +311,7 @@ public class AuthenticationProviderList implements 
AuthenticationProvider {
                 throw new AuthenticationException("Failed to initialize a new 
auth state from " + remoteAddress);
             }
         } else {
-            return new AuthenticationListState(states);
+            return new AuthenticationListState(states, authenticationMetrics);
         }
     }
 
@@ -325,7 +342,7 @@ public class AuthenticationProviderList implements 
AuthenticationProvider {
                         "Failed to initialize a new http auth state from " + 
request.getRemoteHost());
             }
         } else {
-            return new AuthenticationListState(states);
+            return new AuthenticationListState(states, authenticationMetrics);
         }
     }
 
@@ -333,6 +350,7 @@ public class AuthenticationProviderList implements 
AuthenticationProvider {
     public boolean authenticateHttpRequest(HttpServletRequest request, 
HttpServletResponse response) throws Exception {
         Boolean authenticated = applyAuthProcessor(
             providers,
+            authenticationMetrics,
             provider -> {
                 try {
                     return provider.authenticateHttpRequest(request, response);
diff --git 
a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authentication/AuthenticationProviderTls.java
 
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authentication/AuthenticationProviderTls.java
index a4c44121b4b..f7ff47fe8e6 100644
--- 
a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authentication/AuthenticationProviderTls.java
+++ 
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authentication/AuthenticationProviderTls.java
@@ -27,6 +27,8 @@ import 
org.apache.pulsar.broker.authentication.metrics.AuthenticationMetrics;
 
 public class AuthenticationProviderTls implements AuthenticationProvider {
 
+    private AuthenticationMetrics authenticationMetrics;
+
     private enum ErrorCode {
         UNKNOWN,
         INVALID_CERTS,
@@ -40,7 +42,13 @@ public class AuthenticationProviderTls implements 
AuthenticationProvider {
 
     @Override
     public void initialize(ServiceConfiguration config) throws IOException {
-        // noop
+        initialize(Context.builder().config(config).build());
+    }
+
+    @Override
+    public void initialize(Context context) throws IOException {
+        authenticationMetrics = new 
AuthenticationMetrics(context.getOpenTelemetry(),
+                getClass().getSimpleName(), getAuthMethodName());
     }
 
     @Override
@@ -48,6 +56,11 @@ public class AuthenticationProviderTls implements 
AuthenticationProvider {
         return "tls";
     }
 
+    @Override
+    public void incrementFailureMetric(Enum<?> errorCode) {
+        authenticationMetrics.recordFailure(errorCode);
+    }
+
     @Override
     public String authenticate(AuthenticationDataSource authData) throws 
AuthenticationException {
         String commonName = null;
@@ -96,7 +109,7 @@ public class AuthenticationProviderTls implements 
AuthenticationProvider {
                 errorCode = ErrorCode.INVALID_CN;
                 throw new AuthenticationException("Client unable to 
authenticate with TLS certificate");
             }
-            
AuthenticationMetrics.authenticateSuccess(getClass().getSimpleName(), 
getAuthMethodName());
+            authenticationMetrics.recordSuccess();
         } catch (AuthenticationException exception) {
             incrementFailureMetric(errorCode);
             throw exception;
diff --git 
a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authentication/AuthenticationProviderToken.java
 
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authentication/AuthenticationProviderToken.java
index f8992b21ff4..74bc85ad3ff 100644
--- 
a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authentication/AuthenticationProviderToken.java
+++ 
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authentication/AuthenticationProviderToken.java
@@ -21,7 +21,6 @@ package org.apache.pulsar.broker.authentication;
 import static java.nio.charset.StandardCharsets.UTF_8;
 import static 
org.apache.pulsar.broker.web.AuthenticationFilter.AuthenticatedDataAttributeName;
 import static 
org.apache.pulsar.broker.web.AuthenticationFilter.AuthenticatedRoleAttributeName;
-import com.google.common.annotations.VisibleForTesting;
 import io.jsonwebtoken.Claims;
 import io.jsonwebtoken.ExpiredJwtException;
 import io.jsonwebtoken.Jwt;
@@ -31,8 +30,6 @@ import io.jsonwebtoken.Jwts;
 import io.jsonwebtoken.RequiredTypeException;
 import io.jsonwebtoken.SignatureAlgorithm;
 import io.jsonwebtoken.security.SignatureException;
-import io.prometheus.client.Counter;
-import io.prometheus.client.Histogram;
 import java.io.IOException;
 import java.net.SocketAddress;
 import java.security.Key;
@@ -44,7 +41,7 @@ import javax.servlet.http.HttpServletRequest;
 import javax.servlet.http.HttpServletResponse;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.pulsar.broker.ServiceConfiguration;
-import org.apache.pulsar.broker.authentication.metrics.AuthenticationMetrics;
+import 
org.apache.pulsar.broker.authentication.metrics.AuthenticationMetricsToken;
 import org.apache.pulsar.broker.authentication.utils.AuthTokenUtils;
 import org.apache.pulsar.common.api.AuthData;
 
@@ -79,17 +76,6 @@ public class AuthenticationProviderToken implements 
AuthenticationProvider {
 
     static final String TOKEN = "token";
 
-    private static final Counter expiredTokenMetrics = Counter.build()
-            .name("pulsar_expired_token_total")
-            .help("Pulsar expired token")
-            .register();
-
-    private static final Histogram expiringTokenMinutesMetrics = 
Histogram.build()
-            .name("pulsar_expiring_token_minutes")
-            .help("The remaining time of expiring token in minutes")
-            .buckets(5, 10, 60, 240)
-            .register();
-
     private Key validationKey;
     private String roleClaim;
     private SignatureAlgorithm publicKeyAlg;
@@ -106,6 +92,8 @@ public class AuthenticationProviderToken implements 
AuthenticationProvider {
     private String confTokenAudienceSettingName;
     private String confTokenAllowedClockSkewSecondsSettingName;
 
+    private AuthenticationMetricsToken authenticationMetricsToken;
+
     public enum ErrorCode {
         INVALID_AUTH_DATA,
         INVALID_TOKEN,
@@ -117,14 +105,17 @@ public class AuthenticationProviderToken implements 
AuthenticationProvider {
         // noop
     }
 
-    @VisibleForTesting
-    public static void resetMetrics() {
-        expiredTokenMetrics.clear();
-        expiringTokenMinutesMetrics.clear();
+    @Override
+    public void initialize(ServiceConfiguration config) throws IOException {
+        initialize(Context.builder().config(config).build());
     }
 
     @Override
-    public void initialize(ServiceConfiguration config) throws IOException, 
IllegalArgumentException {
+    public void initialize(Context context) throws IOException {
+        authenticationMetricsToken = new 
AuthenticationMetricsToken(context.getOpenTelemetry(),
+                getClass().getSimpleName(), getAuthMethodName());
+
+        var config = context.getConfig();
         String prefix = (String) config.getProperty(CONF_TOKEN_SETTING_PREFIX);
         if (null == prefix) {
             prefix = "";
@@ -162,6 +153,11 @@ public class AuthenticationProviderToken implements 
AuthenticationProvider {
         return TOKEN;
     }
 
+    @Override
+    public void incrementFailureMetric(Enum<?> errorCode) {
+        authenticationMetricsToken.recordFailure(errorCode);
+    }
+
     @Override
     public String authenticate(AuthenticationDataSource authData) throws 
AuthenticationException {
         String token;
@@ -174,7 +170,7 @@ public class AuthenticationProviderToken implements 
AuthenticationProvider {
         }
         // Parse Token by validating
         String role = getPrincipal(authenticateToken(token));
-        AuthenticationMetrics.authenticateSuccess(getClass().getSimpleName(), 
getAuthMethodName());
+        authenticationMetricsToken.recordSuccess();
         return role;
     }
 
@@ -263,14 +259,13 @@ public class AuthenticationProviderToken implements 
AuthenticationProvider {
                 }
             }
 
-            if (jwt.getBody().getExpiration() != null) {
-                expiringTokenMinutesMetrics.observe(
-                        (double) (jwt.getBody().getExpiration().getTime() - 
new Date().getTime()) / (60 * 1000));
-            }
+            var expiration = jwt.getBody().getExpiration();
+            var tokenRemainingDurationMs = expiration != null ? 
expiration.getTime() - new Date().getTime() : null;
+            
authenticationMetricsToken.recordTokenDuration(tokenRemainingDurationMs);
             return jwt;
         } catch (JwtException e) {
             if (e instanceof ExpiredJwtException) {
-                expiredTokenMetrics.inc();
+                authenticationMetricsToken.recordTokenExpired();
             }
             incrementFailureMetric(ErrorCode.INVALID_TOKEN);
             throw new AuthenticationException("Failed to authentication token: 
" + e.getMessage());
diff --git 
a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authentication/AuthenticationService.java
 
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authentication/AuthenticationService.java
index 22296b86b4e..f6eb785d2e4 100644
--- 
a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authentication/AuthenticationService.java
+++ 
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authentication/AuthenticationService.java
@@ -20,6 +20,7 @@ package org.apache.pulsar.broker.authentication;
 
 import static 
org.apache.pulsar.broker.web.AuthenticationFilter.AuthenticatedDataAttributeName;
 import static 
org.apache.pulsar.broker.web.AuthenticationFilter.AuthenticatedRoleAttributeName;
+import io.opentelemetry.api.OpenTelemetry;
 import java.io.Closeable;
 import java.io.IOException;
 import java.util.ArrayList;
@@ -51,6 +52,11 @@ public class AuthenticationService implements Closeable {
     private final Map<String, AuthenticationProvider> providers = new 
HashMap<>();
 
     public AuthenticationService(ServiceConfiguration conf) throws 
PulsarServerException {
+        this(conf, OpenTelemetry.noop());
+    }
+
+    public AuthenticationService(ServiceConfiguration conf, OpenTelemetry 
openTelemetry)
+            throws PulsarServerException {
         anonymousUserRole = conf.getAnonymousUserRole();
         if (conf.isAuthenticationEnabled()) {
             try {
@@ -70,6 +76,10 @@ public class AuthenticationService implements Closeable {
                     providerList.add(provider);
                 }
 
+                var authenticationProviderContext = 
AuthenticationProvider.Context.builder()
+                        .config(conf)
+                        .openTelemetry(openTelemetry)
+                        .build();
                 for (Map.Entry<String, List<AuthenticationProvider>> entry : 
providerMap.entrySet()) {
                     AuthenticationProvider provider;
                     if (entry.getValue().size() == 1) {
@@ -77,7 +87,7 @@ public class AuthenticationService implements Closeable {
                     } else {
                         provider = new 
AuthenticationProviderList(entry.getValue());
                     }
-                    provider.initialize(conf);
+                    provider.initialize(authenticationProviderContext);
                     providers.put(provider.getAuthMethodName(), provider);
                     LOG.info("[{}] has been loaded.",
                         entry.getValue().stream().map(
diff --git 
a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authentication/metrics/AuthenticationMetrics.java
 
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authentication/metrics/AuthenticationMetrics.java
index 5faaccbe157..931ad50e117 100644
--- 
a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authentication/metrics/AuthenticationMetrics.java
+++ 
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authentication/metrics/AuthenticationMetrics.java
@@ -18,28 +18,27 @@
  */
 package org.apache.pulsar.broker.authentication.metrics;
 
+import io.opentelemetry.api.OpenTelemetry;
+import io.opentelemetry.api.common.AttributeKey;
+import io.opentelemetry.api.common.Attributes;
+import io.opentelemetry.api.metrics.LongCounter;
 import io.prometheus.client.Counter;
 
 public class AuthenticationMetrics {
+    @Deprecated
     private static final Counter authSuccessMetrics = Counter.build()
             .name("pulsar_authentication_success_total")
             .help("Pulsar authentication success")
             .labelNames("provider_name", "auth_method")
             .register();
+    @Deprecated
     private static final Counter authFailuresMetrics = Counter.build()
             .name("pulsar_authentication_failures_total")
             .help("Pulsar authentication failures")
             .labelNames("provider_name", "auth_method", "reason")
             .register();
 
-    /**
-     * Log authenticate success event to the authentication metrics.
-     * @param providerName The short class name of the provider
-     * @param authMethod Authentication method name
-     */
-    public static void authenticateSuccess(String providerName, String 
authMethod) {
-        authSuccessMetrics.labels(providerName, authMethod).inc();
-    }
+    public static final String INSTRUMENTATION_SCOPE_NAME = 
"org.apache.pulsar.authentication";
 
     /**
      * Log authenticate failure event to the authentication metrics.
@@ -62,8 +61,58 @@ public class AuthenticationMetrics {
      * @param authMethod Authentication method name.
      * @param errorCode Error code.
      */
+    @Deprecated
     public static void authenticateFailure(String providerName, String 
authMethod, Enum<?> errorCode) {
         authFailuresMetrics.labels(providerName, authMethod, 
errorCode.name()).inc();
     }
 
+    public static final String AUTHENTICATION_COUNTER_METRIC_NAME = 
"pulsar.authentication.operation.count";
+    private final LongCounter authenticationCounter;
+
+    public static final AttributeKey<String> PROVIDER_KEY = 
AttributeKey.stringKey("pulsar.authentication.provider");
+    public static final AttributeKey<String> AUTH_METHOD_KEY = 
AttributeKey.stringKey("pulsar.authentication.method");
+    public static final AttributeKey<String> ERROR_CODE_KEY = 
AttributeKey.stringKey("pulsar.authentication.error");
+    public static final AttributeKey<String> AUTH_RESULT_KEY = 
AttributeKey.stringKey("pulsar.authentication.result");
+    public enum AuthenticationResult {
+        SUCCESS,
+        FAILURE;
+    }
+
+    private final String providerName;
+    private final String authMethod;
+
+    public AuthenticationMetrics(OpenTelemetry openTelemetry, String 
providerName, String authMethod) {
+        this.providerName = providerName;
+        this.authMethod = authMethod;
+        var meter = openTelemetry.getMeter(INSTRUMENTATION_SCOPE_NAME);
+        authenticationCounter = 
meter.counterBuilder(AUTHENTICATION_COUNTER_METRIC_NAME)
+                .setDescription("The number of authentication operations")
+                .setUnit("{operation}")
+                .build();
+    }
+
+    public void recordSuccess() {
+        authSuccessMetrics.labels(providerName, authMethod).inc();
+        var attributes = Attributes.of(PROVIDER_KEY, providerName,
+                AUTH_METHOD_KEY, authMethod,
+                AUTH_RESULT_KEY, 
AuthenticationResult.SUCCESS.name().toLowerCase());
+        authenticationCounter.add(1, attributes);
+    }
+
+    public void recordFailure(Enum<?> errorCode) {
+        recordFailure(providerName, authMethod, errorCode.name());
+    }
+
+    public void recordFailure(String providerName, String authMethod, Enum<?> 
errorCode) {
+        recordFailure(providerName, authMethod, errorCode.name());
+    }
+
+    public void recordFailure(String providerName, String authMethod, String 
errorCode) {
+        authenticateFailure(providerName, authMethod, errorCode);
+        var attributes = Attributes.of(PROVIDER_KEY, providerName,
+                AUTH_METHOD_KEY, authMethod,
+                AUTH_RESULT_KEY, 
AuthenticationResult.FAILURE.name().toLowerCase(),
+                ERROR_CODE_KEY, errorCode);
+        authenticationCounter.add(1, attributes);
+    }
 }
diff --git 
a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authentication/metrics/AuthenticationMetricsToken.java
 
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authentication/metrics/AuthenticationMetricsToken.java
new file mode 100644
index 00000000000..4e9d1d6b16a
--- /dev/null
+++ 
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authentication/metrics/AuthenticationMetricsToken.java
@@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.authentication.metrics;
+
+import com.google.common.annotations.VisibleForTesting;
+import io.opentelemetry.api.OpenTelemetry;
+import io.opentelemetry.api.metrics.DoubleHistogram;
+import io.opentelemetry.api.metrics.LongCounter;
+import io.prometheus.client.Counter;
+import io.prometheus.client.Histogram;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+import org.apache.pulsar.common.stats.MetricsUtil;
+
+public class AuthenticationMetricsToken extends AuthenticationMetrics {
+
+    @Deprecated
+    private static final Counter expiredTokenMetrics = Counter.build()
+            .name("pulsar_expired_token_total")
+            .help("Pulsar expired token")
+            .register();
+    public static final String EXPIRED_TOKEN_COUNTER_METRIC_NAME = 
"pulsar.authentication.token.expired.count";
+    private LongCounter expiredTokensCounter;
+
+    private static final List<Long> TOKEN_DURATION_BUCKET_BOUNDARIES_SECONDS = 
List.of(
+            TimeUnit.MINUTES.toSeconds(5),
+            TimeUnit.MINUTES.toSeconds(10),
+            TimeUnit.HOURS.toSeconds(1),
+            TimeUnit.HOURS.toSeconds(4),
+            TimeUnit.DAYS.toSeconds(1),
+            TimeUnit.DAYS.toSeconds(7),
+            TimeUnit.DAYS.toSeconds(14),
+            TimeUnit.DAYS.toSeconds(30),
+            TimeUnit.DAYS.toSeconds(90),
+            TimeUnit.DAYS.toSeconds(180),
+            TimeUnit.DAYS.toSeconds(270),
+            TimeUnit.DAYS.toSeconds(365));
+
+    @Deprecated
+    private static final Histogram expiringTokenMinutesMetrics = 
Histogram.build()
+            .name("pulsar_expiring_token_minutes")
+            .help("The remaining time of expiring token in minutes")
+            .buckets(TOKEN_DURATION_BUCKET_BOUNDARIES_SECONDS.stream()
+                    .map(TimeUnit.SECONDS::toMinutes)
+                    .mapToDouble(Double::valueOf)
+                    .toArray())
+            .register();
+    public static final String EXPIRING_TOKEN_HISTOGRAM_METRIC_NAME = 
"pulsar.authentication.token.expiry.duration";
+    private DoubleHistogram expiringTokenSeconds;
+
+    public AuthenticationMetricsToken(OpenTelemetry openTelemetry, String 
providerName,
+                                      String authMethod) {
+        super(openTelemetry, providerName, authMethod);
+
+        var meter = 
openTelemetry.getMeter(AuthenticationMetrics.INSTRUMENTATION_SCOPE_NAME);
+        expiredTokensCounter = 
meter.counterBuilder(EXPIRED_TOKEN_COUNTER_METRIC_NAME)
+                .setDescription("The total number of expired tokens")
+                .setUnit("{token}")
+                .build();
+        expiringTokenSeconds = 
meter.histogramBuilder(EXPIRING_TOKEN_HISTOGRAM_METRIC_NAME)
+                .setExplicitBucketBoundariesAdvice(
+                        
TOKEN_DURATION_BUCKET_BOUNDARIES_SECONDS.stream().map(Double::valueOf).toList())
+                .setDescription("The remaining time of expiring token in 
seconds")
+                .setUnit("s")
+                .build();
+    }
+
+    public void recordTokenDuration(Long durationMs) {
+        if (durationMs == null) {
+            // Special case signals a token without expiry. OpenTelemetry 
supports reporting infinite values.
+            expiringTokenSeconds.record(Double.POSITIVE_INFINITY);
+        } else if (durationMs > 0) {
+            expiringTokenMinutesMetrics.observe(durationMs / 60_000.0d);
+            
expiringTokenSeconds.record(MetricsUtil.convertToSeconds(durationMs, 
TimeUnit.MILLISECONDS));
+        } else {
+            // Duration can be negative if token expires at processing time. 
OpenTelemetry does not support negative
+            // values, so record token expiry instead.
+            recordTokenExpired();
+        }
+    }
+
+    public void recordTokenExpired() {
+        expiredTokenMetrics.inc();
+        expiredTokensCounter.add(1);
+    }
+
+    @VisibleForTesting
+    @Deprecated
+    public static void reset() {
+        expiredTokenMetrics.clear();
+        expiringTokenMinutesMetrics.clear();
+    }
+}
diff --git 
a/pulsar-broker-common/src/test/java/org/apache/pulsar/broker/authentication/AuthenticationProviderBasicTest.java
 
b/pulsar-broker-common/src/test/java/org/apache/pulsar/broker/authentication/AuthenticationProviderBasicTest.java
index 723fde7083d..f6e4b8e969a 100644
--- 
a/pulsar-broker-common/src/test/java/org/apache/pulsar/broker/authentication/AuthenticationProviderBasicTest.java
+++ 
b/pulsar-broker-common/src/test/java/org/apache/pulsar/broker/authentication/AuthenticationProviderBasicTest.java
@@ -52,7 +52,7 @@ public class AuthenticationProviderBasicTest {
         Properties properties = new Properties();
         properties.setProperty("basicAuthConf", basicAuthConf);
         serviceConfiguration.setProperties(properties);
-        provider.initialize(serviceConfiguration);
+        
provider.initialize(AuthenticationProvider.Context.builder().config(serviceConfiguration).build());
         testAuthenticate(provider);
     }
 
@@ -64,7 +64,7 @@ public class AuthenticationProviderBasicTest {
         Properties properties = new Properties();
         properties.setProperty("basicAuthConf", basicAuthConfBase64);
         serviceConfiguration.setProperties(properties);
-        provider.initialize(serviceConfiguration);
+        
provider.initialize(AuthenticationProvider.Context.builder().config(serviceConfiguration).build());
         testAuthenticate(provider);
     }
 
@@ -74,7 +74,7 @@ public class AuthenticationProviderBasicTest {
         AuthenticationProviderBasic provider = new 
AuthenticationProviderBasic();
         ServiceConfiguration serviceConfiguration = new ServiceConfiguration();
         System.setProperty("pulsar.auth.basic.conf", basicAuthConf);
-        provider.initialize(serviceConfiguration);
+        
provider.initialize(AuthenticationProvider.Context.builder().config(serviceConfiguration).build());
         testAuthenticate(provider);
     }
 
@@ -84,7 +84,7 @@ public class AuthenticationProviderBasicTest {
         AuthenticationProviderBasic provider = new 
AuthenticationProviderBasic();
         ServiceConfiguration serviceConfiguration = new ServiceConfiguration();
         System.setProperty("pulsar.auth.basic.conf", basicAuthConfBase64);
-        provider.initialize(serviceConfiguration);
+        
provider.initialize(AuthenticationProvider.Context.builder().config(serviceConfiguration).build());
         testAuthenticate(provider);
     }
 
diff --git 
a/pulsar-broker-common/src/test/java/org/apache/pulsar/broker/authentication/AuthenticationProviderListTest.java
 
b/pulsar-broker-common/src/test/java/org/apache/pulsar/broker/authentication/AuthenticationProviderListTest.java
index 7793a5c029f..e81198217b5 100644
--- 
a/pulsar-broker-common/src/test/java/org/apache/pulsar/broker/authentication/AuthenticationProviderListTest.java
+++ 
b/pulsar-broker-common/src/test/java/org/apache/pulsar/broker/authentication/AuthenticationProviderListTest.java
@@ -29,7 +29,6 @@ import static org.mockito.Mockito.when;
 import static org.testng.Assert.assertEquals;
 import static org.testng.Assert.assertFalse;
 import static org.testng.Assert.assertTrue;
-
 import io.jsonwebtoken.SignatureAlgorithm;
 import io.jsonwebtoken.io.Decoders;
 import io.jsonwebtoken.security.Keys;
@@ -90,7 +89,7 @@ public class AuthenticationProviderListTest {
         );
         ServiceConfiguration confA = new ServiceConfiguration();
         confA.setProperties(propertiesA);
-        providerA.initialize(confA);
+        
providerA.initialize(AuthenticationProvider.Context.builder().config(confA).build());
 
         Properties propertiesB = new Properties();
         
propertiesB.setProperty(AuthenticationProviderToken.CONF_TOKEN_SETTING_PREFIX, 
"b");
@@ -103,7 +102,7 @@ public class AuthenticationProviderListTest {
         );
         ServiceConfiguration confB = new ServiceConfiguration();
         confB.setProperties(propertiesB);
-        providerB.initialize(confB);
+        
providerB.initialize(AuthenticationProvider.Context.builder().config(confB).build());
 
         this.authProvider = new AuthenticationProviderList(Lists.newArrayList(
             providerA, providerB
diff --git 
a/pulsar-broker-common/src/test/java/org/apache/pulsar/broker/authentication/AuthenticationProviderTokenTest.java
 
b/pulsar-broker-common/src/test/java/org/apache/pulsar/broker/authentication/AuthenticationProviderTokenTest.java
index f50731c7654..3e1a3e18034 100644
--- 
a/pulsar-broker-common/src/test/java/org/apache/pulsar/broker/authentication/AuthenticationProviderTokenTest.java
+++ 
b/pulsar-broker-common/src/test/java/org/apache/pulsar/broker/authentication/AuthenticationProviderTokenTest.java
@@ -55,7 +55,9 @@ import javax.crypto.SecretKey;
 import javax.naming.AuthenticationException;
 import javax.servlet.http.HttpServletRequest;
 import lombok.Cleanup;
+import org.apache.commons.lang3.reflect.FieldUtils;
 import org.apache.pulsar.broker.ServiceConfiguration;
+import 
org.apache.pulsar.broker.authentication.metrics.AuthenticationMetricsToken;
 import org.apache.pulsar.broker.authentication.utils.AuthTokenUtils;
 import org.apache.pulsar.common.api.AuthData;
 import org.mockito.Mockito;
@@ -70,7 +72,7 @@ public class AuthenticationProviderTokenTest {
         AuthenticationProviderToken provider = new 
AuthenticationProviderToken();
 
         try {
-            provider.initialize(new ServiceConfiguration());
+            
provider.initialize(AuthenticationProvider.Context.builder().config(new 
ServiceConfiguration()).build());
             fail("should have failed");
         } catch (IOException e) {
             // Expected, secret key was not defined
@@ -135,7 +137,7 @@ public class AuthenticationProviderTokenTest {
 
         ServiceConfiguration conf = new ServiceConfiguration();
         conf.setProperties(properties);
-        provider.initialize(conf);
+        
provider.initialize(AuthenticationProvider.Context.builder().config(conf).build());
 
         try {
             provider.authenticate(new AuthenticationDataSource() {
@@ -249,7 +251,7 @@ public class AuthenticationProviderTokenTest {
 
         ServiceConfiguration conf = new ServiceConfiguration();
         conf.setProperties(properties);
-        provider.initialize(conf);
+        
provider.initialize(AuthenticationProvider.Context.builder().config(conf).build());
     }
 
     @Test
@@ -268,7 +270,7 @@ public class AuthenticationProviderTokenTest {
 
         ServiceConfiguration conf = new ServiceConfiguration();
         conf.setProperties(properties);
-        provider.initialize(conf);
+        
provider.initialize(AuthenticationProvider.Context.builder().config(conf).build());
 
         String token = AuthTokenUtils.createToken(secretKey, SUBJECT, 
Optional.empty());
 
@@ -303,7 +305,7 @@ public class AuthenticationProviderTokenTest {
 
         ServiceConfiguration conf = new ServiceConfiguration();
         conf.setProperties(properties);
-        provider.initialize(conf);
+        
provider.initialize(AuthenticationProvider.Context.builder().config(conf).build());
 
         String token = AuthTokenUtils.createToken(secretKey, SUBJECT, 
Optional.empty());
 
@@ -335,7 +337,7 @@ public class AuthenticationProviderTokenTest {
 
         ServiceConfiguration conf = new ServiceConfiguration();
         conf.setProperties(properties);
-        provider.initialize(conf);
+        
provider.initialize(AuthenticationProvider.Context.builder().config(conf).build());
 
         String token = AuthTokenUtils.createToken(secretKey, SUBJECT, 
Optional.empty());
 
@@ -370,7 +372,7 @@ public class AuthenticationProviderTokenTest {
 
         ServiceConfiguration conf = new ServiceConfiguration();
         conf.setProperties(properties);
-        provider.initialize(conf);
+        
provider.initialize(AuthenticationProvider.Context.builder().config(conf).build());
 
         // Use private key to generate token
         PrivateKey privateKey = 
AuthTokenUtils.decodePrivateKey(Decoders.BASE64.decode(privateKeyStr), 
SignatureAlgorithm.RS256);
@@ -413,8 +415,7 @@ public class AuthenticationProviderTokenTest {
 
         ServiceConfiguration conf = new ServiceConfiguration();
         conf.setProperties(properties);
-        provider.initialize(conf);
-
+        
provider.initialize(AuthenticationProvider.Context.builder().config(conf).build());
 
         // Use private key to generate token
         PrivateKey privateKey = 
AuthTokenUtils.decodePrivateKey(Decoders.BASE64.decode(privateKeyStr), 
SignatureAlgorithm.RS256);
@@ -460,7 +461,7 @@ public class AuthenticationProviderTokenTest {
 
         ServiceConfiguration conf = new ServiceConfiguration();
         conf.setProperties(properties);
-        provider.initialize(conf);
+        
provider.initialize(AuthenticationProvider.Context.builder().config(conf).build());
 
         // Use private key to generate token
         PrivateKey privateKey = 
AuthTokenUtils.decodePrivateKey(Decoders.BASE64.decode(privateKeyStr), 
SignatureAlgorithm.ES256);
@@ -484,8 +485,11 @@ public class AuthenticationProviderTokenTest {
     }
 
     @Test(expectedExceptions = AuthenticationException.class)
-    public void testAuthenticateWhenNoJwtPassed() throws 
AuthenticationException {
+    public void testAuthenticateWhenNoJwtPassed() throws Exception {
+        @Cleanup
         AuthenticationProviderToken provider = new 
AuthenticationProviderToken();
+        FieldUtils.writeDeclaredField(
+                provider, "authenticationMetricsToken", 
mock(AuthenticationMetricsToken.class), true);
         provider.authenticate(new AuthenticationDataSource() {
             @Override
             public boolean hasDataFromCommand() {
@@ -500,8 +504,11 @@ public class AuthenticationProviderTokenTest {
     }
 
     @Test(expectedExceptions = AuthenticationException.class)
-    public void testAuthenticateWhenAuthorizationHeaderNotExist() throws 
AuthenticationException {
+    public void testAuthenticateWhenAuthorizationHeaderNotExist() throws 
Exception {
+        @Cleanup
         AuthenticationProviderToken provider = new 
AuthenticationProviderToken();
+        FieldUtils.writeDeclaredField(
+                provider, "authenticationMetricsToken", 
mock(AuthenticationMetricsToken.class), true);
         provider.authenticate(new AuthenticationDataSource() {
             @Override
             public String getHttpHeader(String name) {
@@ -516,8 +523,11 @@ public class AuthenticationProviderTokenTest {
     }
 
     @Test(expectedExceptions = AuthenticationException.class)
-    public void testAuthenticateWhenAuthHeaderValuePrefixIsInvalid() throws 
AuthenticationException {
+    public void testAuthenticateWhenAuthHeaderValuePrefixIsInvalid() throws 
Exception {
+        @Cleanup
         AuthenticationProviderToken provider = new 
AuthenticationProviderToken();
+        FieldUtils.writeDeclaredField(
+                provider, "authenticationMetricsToken", 
mock(AuthenticationMetricsToken.class), true);
         provider.authenticate(new AuthenticationDataSource() {
             @Override
             public String getHttpHeader(String name) {
@@ -532,8 +542,11 @@ public class AuthenticationProviderTokenTest {
     }
 
     @Test(expectedExceptions = AuthenticationException.class)
-    public void testAuthenticateWhenJwtIsBlank() throws 
AuthenticationException {
+    public void testAuthenticateWhenJwtIsBlank() throws Exception {
+        @Cleanup
         AuthenticationProviderToken provider = new 
AuthenticationProviderToken();
+        FieldUtils.writeDeclaredField(
+                provider, "authenticationMetricsToken", 
mock(AuthenticationMetricsToken.class), true);
         provider.authenticate(new AuthenticationDataSource() {
             @Override
             public String getHttpHeader(String name) {
@@ -559,7 +572,7 @@ public class AuthenticationProviderTokenTest {
         conf.setProperties(properties);
 
         AuthenticationProviderToken provider = new 
AuthenticationProviderToken();
-        provider.initialize(conf);
+        
provider.initialize(AuthenticationProvider.Context.builder().config(conf).build());
         provider.authenticate(new AuthenticationDataSource() {
             @Override
             public String getHttpHeader(String name) {
@@ -582,7 +595,7 @@ public class AuthenticationProviderTokenTest {
         conf.setProperties(properties);
 
         AuthenticationProviderToken provider = new 
AuthenticationProviderToken();
-        provider.initialize(conf);
+        
provider.initialize(AuthenticationProvider.Context.builder().config(conf).build());
     }
 
     @Test(expectedExceptions = IOException.class)
@@ -594,7 +607,7 @@ public class AuthenticationProviderTokenTest {
         conf.setProperties(properties);
 
         AuthenticationProviderToken provider = new 
AuthenticationProviderToken();
-        provider.initialize(conf);
+        
provider.initialize(AuthenticationProvider.Context.builder().config(conf).build());
     }
 
     @Test(expectedExceptions = IOException.class)
@@ -606,7 +619,7 @@ public class AuthenticationProviderTokenTest {
         ServiceConfiguration conf = new ServiceConfiguration();
         conf.setProperties(properties);
 
-        new AuthenticationProviderToken().initialize(conf);
+        new 
AuthenticationProviderToken().initialize(AuthenticationProvider.Context.builder().config(conf).build());
     }
 
     @Test(expectedExceptions = IOException.class)
@@ -618,7 +631,7 @@ public class AuthenticationProviderTokenTest {
         ServiceConfiguration conf = new ServiceConfiguration();
         conf.setProperties(properties);
 
-        new AuthenticationProviderToken().initialize(conf);
+        new 
AuthenticationProviderToken().initialize(AuthenticationProvider.Context.builder().config(conf).build());
     }
 
     @Test(expectedExceptions = IllegalArgumentException.class)
@@ -633,7 +646,7 @@ public class AuthenticationProviderTokenTest {
         ServiceConfiguration conf = new ServiceConfiguration();
         conf.setProperties(properties);
 
-        new AuthenticationProviderToken().initialize(conf);
+        new 
AuthenticationProviderToken().initialize(AuthenticationProvider.Context.builder().config(conf).build());
     }
 
     @Test(expectedExceptions = IOException.class)
@@ -645,7 +658,7 @@ public class AuthenticationProviderTokenTest {
         ServiceConfiguration conf = new ServiceConfiguration();
         conf.setProperties(properties);
 
-        new AuthenticationProviderToken().initialize(conf);
+        new 
AuthenticationProviderToken().initialize(AuthenticationProvider.Context.builder().config(conf).build());
     }
 
     @Test(expectedExceptions = IllegalArgumentException.class)
@@ -657,7 +670,7 @@ public class AuthenticationProviderTokenTest {
         ServiceConfiguration conf = new ServiceConfiguration();
         conf.setProperties(properties);
 
-        new AuthenticationProviderToken().initialize(conf);
+        new 
AuthenticationProviderToken().initialize(AuthenticationProvider.Context.builder().config(conf).build());
     }
 
 
@@ -676,7 +689,7 @@ public class AuthenticationProviderTokenTest {
 
         ServiceConfiguration conf = new ServiceConfiguration();
         conf.setProperties(properties);
-        provider.initialize(conf);
+        
provider.initialize(AuthenticationProvider.Context.builder().config(conf).build());
 
         // Create a token that will expire in 3 seconds
         String expiringToken = AuthTokenUtils.createToken(secretKey, SUBJECT,
@@ -709,7 +722,7 @@ public class AuthenticationProviderTokenTest {
 
         ServiceConfiguration conf = new ServiceConfiguration();
         conf.setProperties(properties);
-        provider.initialize(conf);
+        
provider.initialize(AuthenticationProvider.Context.builder().config(conf).build());
 
         // Create a token that is already expired
         String expiringToken = AuthTokenUtils.createToken(secretKey, SUBJECT,
@@ -828,7 +841,7 @@ public class AuthenticationProviderTokenTest {
 
         ServiceConfiguration conf = new ServiceConfiguration();
         conf.setProperties(properties);
-        provider.initialize(conf);
+        
provider.initialize(AuthenticationProvider.Context.builder().config(conf).build());
 
         // Use private key to generate token
         PrivateKey privateKey = 
AuthTokenUtils.decodePrivateKey(Decoders.BASE64.decode(privateKeyStr), 
SignatureAlgorithm.RS256);
@@ -877,7 +890,7 @@ public class AuthenticationProviderTokenTest {
                 );
         
Mockito.when(mockConf.getProperty(AuthenticationProviderToken.CONF_TOKEN_SETTING_PREFIX)).thenReturn(prefix);
 
-        provider.initialize(mockConf);
+        
provider.initialize(AuthenticationProvider.Context.builder().config(mockConf).build());
 
         // Each property is fetched only once. Prevent multiple fetches.
         Mockito.verify(mockConf, 
Mockito.times(1)).getProperty(AuthenticationProviderToken.CONF_TOKEN_SETTING_PREFIX);
@@ -908,7 +921,7 @@ public class AuthenticationProviderTokenTest {
 
         ServiceConfiguration conf = new ServiceConfiguration();
         conf.setProperties(properties);
-        provider.initialize(conf);
+        
provider.initialize(AuthenticationProvider.Context.builder().config(conf).build());
 
         String token = AuthTokenUtils.createToken(secretKey, SUBJECT, 
Optional.empty());
         HttpServletRequest servletRequest = mock(HttpServletRequest.class);
@@ -934,7 +947,7 @@ public class AuthenticationProviderTokenTest {
 
         ServiceConfiguration conf = new ServiceConfiguration();
         conf.setProperties(properties);
-        provider.initialize(conf);
+        
provider.initialize(AuthenticationProvider.Context.builder().config(conf).build());
 
         String token = AuthTokenUtils.createToken(secretKey, SUBJECT, 
Optional.empty());
         HttpServletRequest servletRequest = mock(HttpServletRequest.class);
@@ -960,7 +973,7 @@ public class AuthenticationProviderTokenTest {
 
         ServiceConfiguration conf = new ServiceConfiguration();
         conf.setProperties(properties);
-        provider.initialize(conf);
+        
provider.initialize(AuthenticationProvider.Context.builder().config(conf).build());
 
         AuthenticationState authState = provider.newAuthState(null,null, null);
 
@@ -1016,7 +1029,7 @@ public class AuthenticationProviderTokenTest {
         
properties.setProperty(AuthenticationProviderToken.CONF_TOKEN_SECRET_KEY, 
secretKeyFile.toString());
         ServiceConfiguration conf = new ServiceConfiguration();
         conf.setProperties(properties);
-        provider.initialize(conf);
+        
provider.initialize(AuthenticationProvider.Context.builder().config(conf).build());
 
         String token = createTokenWithAudience(secretKey, audienceClaim, 
audiences);
 
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
index e5248d45d42..c0e3e7d356b 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
@@ -398,7 +398,8 @@ public class BrokerService implements Closeable {
                 .name("pulsar-backlog-quota-checker")
                 .numThreads(1)
                 .build();
-        this.authenticationService = new 
AuthenticationService(pulsar.getConfiguration());
+        this.authenticationService = new 
AuthenticationService(pulsar.getConfiguration(),
+                pulsar.getOpenTelemetry().getOpenTelemetry());
         this.blockedDispatchers =
                 
ConcurrentOpenHashSet.<PersistentDispatcherMultipleConsumers>newBuilder().build();
         this.topicFactory = createPersistentTopicFactory();
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/PulsarBrokerOpenTelemetry.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/PulsarBrokerOpenTelemetry.java
index 178da8b8498..065b03e6454 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/PulsarBrokerOpenTelemetry.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/PulsarBrokerOpenTelemetry.java
@@ -19,6 +19,7 @@
 package org.apache.pulsar.broker.stats;
 
 import com.google.common.annotations.VisibleForTesting;
+import io.opentelemetry.api.OpenTelemetry;
 import io.opentelemetry.api.metrics.Meter;
 import 
io.opentelemetry.sdk.autoconfigure.AutoConfiguredOpenTelemetrySdkBuilder;
 import java.io.Closeable;
@@ -50,6 +51,10 @@ public class PulsarBrokerOpenTelemetry implements Closeable {
         meter = 
openTelemetryService.getOpenTelemetry().getMeter(Constants.BROKER_INSTRUMENTATION_SCOPE_NAME);
     }
 
+    public OpenTelemetry getOpenTelemetry() {
+        return openTelemetryService.getOpenTelemetry();
+    }
+
     @Override
     public void close() {
         openTelemetryService.close();
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/MetadataStoreStatsTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/MetadataStoreStatsTest.java
index 726bde3f3d0..27bdb2e3004 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/MetadataStoreStatsTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/MetadataStoreStatsTest.java
@@ -32,7 +32,7 @@ import lombok.Cleanup;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.pulsar.PrometheusMetricsTestUtil;
 import org.apache.pulsar.broker.ServiceConfiguration;
-import org.apache.pulsar.broker.authentication.AuthenticationProviderToken;
+import 
org.apache.pulsar.broker.authentication.metrics.AuthenticationMetricsToken;
 import org.apache.pulsar.broker.service.BrokerTestBase;
 import org.apache.pulsar.client.api.Consumer;
 import org.apache.pulsar.client.api.Message;
@@ -52,7 +52,7 @@ public class MetadataStoreStatsTest extends BrokerTestBase {
     @Override
     protected void setup() throws Exception {
         super.baseSetup();
-        AuthenticationProviderToken.resetMetrics();
+        AuthenticationMetricsToken.reset();
     }
 
     @Override
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/OpenTelemetryAuthenticationStatsTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/OpenTelemetryAuthenticationStatsTest.java
new file mode 100644
index 00000000000..4cde37b50ff
--- /dev/null
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/OpenTelemetryAuthenticationStatsTest.java
@@ -0,0 +1,161 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.stats;
+
+import static 
io.opentelemetry.sdk.testing.assertj.OpenTelemetryAssertions.assertThat;
+import static 
org.apache.pulsar.broker.stats.BrokerOpenTelemetryTestUtil.assertMetricLongSumValue;
+import io.jsonwebtoken.SignatureAlgorithm;
+import io.opentelemetry.api.common.Attributes;
+import java.time.Duration;
+import java.util.Date;
+import java.util.Optional;
+import java.util.Properties;
+import java.util.concurrent.TimeUnit;
+import javax.crypto.SecretKey;
+import javax.naming.AuthenticationException;
+import org.apache.pulsar.broker.ServiceConfiguration;
+import org.apache.pulsar.broker.authentication.AuthenticationDataSource;
+import org.apache.pulsar.broker.authentication.AuthenticationProvider;
+import org.apache.pulsar.broker.authentication.AuthenticationProviderToken;
+import org.apache.pulsar.broker.authentication.metrics.AuthenticationMetrics;
+import 
org.apache.pulsar.broker.authentication.metrics.AuthenticationMetricsToken;
+import org.apache.pulsar.broker.authentication.utils.AuthTokenUtils;
+import org.apache.pulsar.broker.service.BrokerTestBase;
+import org.apache.pulsar.broker.testcontext.PulsarTestContext;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+public class OpenTelemetryAuthenticationStatsTest extends BrokerTestBase {
+
+    private static final Duration AUTHENTICATION_TIMEOUT = 
Duration.ofSeconds(1);
+
+    private SecretKey secretKey;
+    private AuthenticationProvider provider;
+
+    @BeforeMethod(alwaysRun = true)
+    @Override
+    protected void setup() throws Exception {
+        super.baseSetup();
+
+        secretKey = AuthTokenUtils.createSecretKey(SignatureAlgorithm.HS256);
+        provider = new AuthenticationProviderToken();
+        registerCloseable(provider);
+
+        var properties = new Properties();
+        properties.setProperty("tokenSecretKey", 
AuthTokenUtils.encodeKeyBase64(secretKey));
+
+        var conf = new ServiceConfiguration();
+        conf.setProperties(properties);
+
+        var authenticationProviderContext = 
AuthenticationProvider.Context.builder()
+                .config(conf)
+                .openTelemetry(pulsar.getOpenTelemetry().getOpenTelemetry())
+                .build();
+        provider.initialize(authenticationProviderContext);
+    }
+
+    @AfterMethod(alwaysRun = true)
+    @Override
+    protected void cleanup() throws Exception {
+        super.internalCleanup();
+    }
+
+    @Override
+    protected void 
customizeMainPulsarTestContextBuilder(PulsarTestContext.Builder builder) {
+        super.customizeMainPulsarTestContextBuilder(builder);
+        builder.enableOpenTelemetry(true);
+    }
+
+    @Test
+    public void testAuthenticationSuccess() {
+        // Pulsar protocol auth
+        assertThat(provider.authenticateAsync(new 
TestAuthenticationDataSource(Optional.empty())))
+                .succeedsWithin(AUTHENTICATION_TIMEOUT);
+        
assertMetricLongSumValue(pulsarTestContext.getOpenTelemetryMetricReader().collectAllMetrics(),
+                AuthenticationMetrics.AUTHENTICATION_COUNTER_METRIC_NAME,
+                Attributes.of(AuthenticationMetrics.PROVIDER_KEY, 
"AuthenticationProviderToken",
+                        AuthenticationMetrics.AUTH_RESULT_KEY, "success",
+                        AuthenticationMetrics.AUTH_METHOD_KEY, "token"),
+                1);
+    }
+
+    @Test
+    public void testTokenDurationHistogram() {
+        // Token with expiry 15 seconds into the future
+        var expiryTime = Optional.of(new Date(System.currentTimeMillis() + 
TimeUnit.SECONDS.toMillis(15)));
+        assertThat(provider.authenticateAsync(new 
TestAuthenticationDataSource(expiryTime)))
+                .succeedsWithin(AUTHENTICATION_TIMEOUT);
+        // Token without expiry
+        assertThat(provider.authenticateAsync(new 
TestAuthenticationDataSource(Optional.empty())))
+                .succeedsWithin(AUTHENTICATION_TIMEOUT);
+        
assertThat(pulsarTestContext.getOpenTelemetryMetricReader().collectAllMetrics())
+                .anySatisfy(metric -> assertThat(metric)
+                        
.hasName(AuthenticationMetricsToken.EXPIRING_TOKEN_HISTOGRAM_METRIC_NAME)
+                        .hasHistogramSatisfying(histogram -> 
histogram.hasPointsSatisfying(
+                                histogramPoint -> 
histogramPoint.hasCount(2).hasMax(Double.POSITIVE_INFINITY))));
+    }
+
+    @Test
+    public void testAuthenticationFailure() {
+        // Authentication should fail if credentials not passed.
+        assertThat(provider.authenticateAsync(new AuthenticationDataSource() { 
}))
+                .failsWithin(AUTHENTICATION_TIMEOUT)
+                .withThrowableThat()
+                .withRootCauseInstanceOf(AuthenticationException.class)
+                .withMessageContaining("No token credentials passed");
+        
assertMetricLongSumValue(pulsarTestContext.getOpenTelemetryMetricReader().collectAllMetrics(),
+                AuthenticationMetrics.AUTHENTICATION_COUNTER_METRIC_NAME,
+                Attributes.of(AuthenticationMetrics.PROVIDER_KEY, 
"AuthenticationProviderToken",
+                        AuthenticationMetrics.AUTH_RESULT_KEY, "failure",
+                        AuthenticationMetrics.AUTH_METHOD_KEY, "token",
+                        AuthenticationMetrics.ERROR_CODE_KEY, 
"INVALID_AUTH_DATA"),
+                1);
+    }
+
+    @Test
+    public void testTokenExpired() {
+        var expiredDate = Optional.of(new Date(System.currentTimeMillis() - 
TimeUnit.HOURS.toMillis(1)));
+        assertThat(provider.authenticateAsync(new 
TestAuthenticationDataSource(expiredDate)))
+                .failsWithin(AUTHENTICATION_TIMEOUT)
+                .withThrowableThat()
+                .withRootCauseInstanceOf(AuthenticationException.class)
+                .withMessageContaining("JWT expired");
+        
assertMetricLongSumValue(pulsarTestContext.getOpenTelemetryMetricReader().collectAllMetrics(),
+                AuthenticationMetricsToken.EXPIRED_TOKEN_COUNTER_METRIC_NAME, 
Attributes.empty(), 1);
+    }
+
+    private class TestAuthenticationDataSource implements 
AuthenticationDataSource {
+        private final String token;
+
+        public TestAuthenticationDataSource(Optional<Date> expiryTime) {
+            token = AuthTokenUtils.createToken(secretKey, "subject", 
expiryTime);
+        }
+
+        @Override
+        public boolean hasDataFromCommand() {
+            return true;
+        }
+
+        @Override
+        public String getCommandData() {
+            return token;
+        }
+    }
+}
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/PrometheusMetricsTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/PrometheusMetricsTest.java
index e7f86d542a0..4df2d36a953 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/PrometheusMetricsTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/PrometheusMetricsTest.java
@@ -21,6 +21,7 @@ package org.apache.pulsar.broker.stats;
 import static com.google.common.base.Preconditions.checkArgument;
 import static 
org.apache.pulsar.broker.stats.prometheus.PrometheusMetricsClient.Metric;
 import static 
org.apache.pulsar.broker.stats.prometheus.PrometheusMetricsClient.parseMetrics;
+import static org.assertj.core.api.AssertionsForClassTypes.assertThat;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
 import static org.testng.Assert.assertEquals;
@@ -70,7 +71,9 @@ import org.apache.commons.lang3.StringUtils;
 import org.apache.pulsar.PrometheusMetricsTestUtil;
 import org.apache.pulsar.broker.ServiceConfiguration;
 import org.apache.pulsar.broker.authentication.AuthenticationDataSource;
+import org.apache.pulsar.broker.authentication.AuthenticationProvider;
 import org.apache.pulsar.broker.authentication.AuthenticationProviderToken;
+import 
org.apache.pulsar.broker.authentication.metrics.AuthenticationMetricsToken;
 import org.apache.pulsar.broker.authentication.utils.AuthTokenUtils;
 import 
org.apache.pulsar.broker.loadbalance.extensions.channel.ServiceUnitStateData;
 import org.apache.pulsar.broker.loadbalance.extensions.manager.UnloadManager;
@@ -108,7 +111,7 @@ public class PrometheusMetricsTest extends BrokerTestBase {
     @Override
     protected void setup() throws Exception {
         super.baseSetup();
-        AuthenticationProviderToken.resetMetrics();
+        AuthenticationMetricsToken.reset();
     }
 
     @Override
@@ -1499,7 +1502,7 @@ public class PrometheusMetricsTest extends BrokerTestBase 
{
 
         ServiceConfiguration conf = new ServiceConfiguration();
         conf.setProperties(properties);
-        provider.initialize(conf);
+        
provider.initialize(AuthenticationProvider.Context.builder().config(conf).build());
 
         try {
             provider.authenticate(new AuthenticationDataSource() {
@@ -1563,7 +1566,7 @@ public class PrometheusMetricsTest extends BrokerTestBase 
{
 
         ServiceConfiguration conf = new ServiceConfiguration();
         conf.setProperties(properties);
-        provider.initialize(conf);
+        
provider.initialize(AuthenticationProvider.Context.builder().config(conf).build());
 
         Date expiredDate = new Date(System.currentTimeMillis() - 
TimeUnit.HOURS.toMillis(1));
         String expiredToken = AuthTokenUtils.createToken(secretKey, "subject", 
Optional.of(expiredDate));
@@ -1599,6 +1602,7 @@ public class PrometheusMetricsTest extends BrokerTestBase 
{
     public void testExpiringTokenMetrics() throws Exception {
         SecretKey secretKey = 
AuthTokenUtils.createSecretKey(SignatureAlgorithm.HS256);
 
+        @Cleanup
         AuthenticationProviderToken provider = new 
AuthenticationProviderToken();
 
         Properties properties = new Properties();
@@ -1606,7 +1610,7 @@ public class PrometheusMetricsTest extends BrokerTestBase 
{
 
         ServiceConfiguration conf = new ServiceConfiguration();
         conf.setProperties(properties);
-        provider.initialize(conf);
+        
provider.initialize(AuthenticationProvider.Context.builder().config(conf).build());
 
         int[] tokenRemainTime = new int[]{3, 7, 40, 100, 400};
 
@@ -1633,27 +1637,19 @@ public class PrometheusMetricsTest extends 
BrokerTestBase {
         Metric countMetric = ((List<Metric>) 
metrics.get("pulsar_expiring_token_minutes_count")).get(0);
         assertEquals(countMetric.value, tokenRemainTime.length);
         List<Metric> cm = (List<Metric>) 
metrics.get("pulsar_expiring_token_minutes_bucket");
-        assertEquals(cm.size(), 5);
+        var buckets = cm.stream().map(m -> 
m.tags.get("le")).collect(Collectors.toSet());
+        assertThat(buckets).isEqualTo(Set.of("5.0", "10.0", "60.0", "240.0", 
"1440.0", "10080.0", "20160.0", "43200.0",
+                "129600.0", "259200.0", "388800.0", "525600.0", "+Inf"));
         cm.forEach((e) -> {
-            switch (e.tags.get("le")) {
-                case "5.0":
-                    assertEquals(e.value, 1);
-                    break;
-                case "10.0":
-                    assertEquals(e.value, 2);
-                    break;
-                case "60.0":
-                    assertEquals(e.value, 3);
-                    break;
-                case "240.0":
-                    assertEquals(e.value, 4);
-                    break;
-                default:
-                    assertEquals(e.value, 5);
-                    break;
-            }
+            var expectedValue = switch(e.tags.get("le")) {
+                case "5.0" -> 1;
+                case "10.0" -> 2;
+                case "60.0" -> 3;
+                case "240.0" -> 4;
+                default -> 5;
+            };
+            assertEquals(e.value, expectedValue);
         });
-        provider.close();
     }
 
     @Test

Reply via email to