This is an automated email from the ASF dual-hosted git repository.
lhotari pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 587af853fbf [feat][misc] PIP-264: Add OpenTelemetry authentication and
token metrics (#23016)
587af853fbf is described below
commit 587af853fbf976d5007e17dba910a4a14e3e85e8
Author: Dragos Misca <[email protected]>
AuthorDate: Wed Aug 28 20:50:52 2024 -0700
[feat][misc] PIP-264: Add OpenTelemetry authentication and token metrics
(#23016)
---
.../AuthenticationProviderAthenz.java | 17 ++-
.../AuthenticationProviderAthenzTest.java | 14 +-
.../oidc/AuthenticationProviderOpenID.java | 27 ++--
.../broker/authentication/oidc/JwksCache.java | 26 ++--
.../oidc/OpenIDProviderMetadataCache.java | 24 +--
...uthenticationProviderOpenIDIntegrationTest.java | 20 +--
.../oidc/AuthenticationProviderOpenIDTest.java | 111 +++++++-------
.../authentication/AuthenticationProviderSasl.java | 6 +
.../authentication/SaslAuthenticateTest.java | 4 +-
pulsar-broker-common/pom.xml | 5 +
.../authentication/AuthenticationProvider.java | 25 ++++
.../AuthenticationProviderBasic.java | 17 ++-
.../authentication/AuthenticationProviderList.java | 44 ++++--
.../authentication/AuthenticationProviderTls.java | 17 ++-
.../AuthenticationProviderToken.java | 47 +++---
.../authentication/AuthenticationService.java | 12 +-
.../metrics/AuthenticationMetrics.java | 65 ++++++++-
.../metrics/AuthenticationMetricsToken.java | 109 ++++++++++++++
.../AuthenticationProviderBasicTest.java | 8 +-
.../AuthenticationProviderListTest.java | 5 +-
.../AuthenticationProviderTokenTest.java | 73 ++++++----
.../pulsar/broker/service/BrokerService.java | 3 +-
.../broker/stats/PulsarBrokerOpenTelemetry.java | 5 +
.../broker/stats/MetadataStoreStatsTest.java | 4 +-
.../OpenTelemetryAuthenticationStatsTest.java | 161 +++++++++++++++++++++
.../pulsar/broker/stats/PrometheusMetricsTest.java | 42 +++---
26 files changed, 674 insertions(+), 217 deletions(-)
diff --git
a/pulsar-broker-auth-athenz/src/main/java/org/apache/pulsar/broker/authentication/AuthenticationProviderAthenz.java
b/pulsar-broker-auth-athenz/src/main/java/org/apache/pulsar/broker/authentication/AuthenticationProviderAthenz.java
index 652a922b9a5..499ebefc8a0 100644
---
a/pulsar-broker-auth-athenz/src/main/java/org/apache/pulsar/broker/authentication/AuthenticationProviderAthenz.java
+++
b/pulsar-broker-auth-athenz/src/main/java/org/apache/pulsar/broker/authentication/AuthenticationProviderAthenz.java
@@ -43,6 +43,8 @@ public class AuthenticationProviderAthenz implements
AuthenticationProvider {
private List<String> domainNameList = null;
private int allowedOffset = 30;
+ private AuthenticationMetrics authenticationMetrics;
+
public enum ErrorCode {
UNKNOWN,
NO_CLIENT,
@@ -54,6 +56,14 @@ public class AuthenticationProviderAthenz implements
AuthenticationProvider {
@Override
public void initialize(ServiceConfiguration config) throws IOException {
+ initialize(Context.builder().config(config).build());
+ }
+
+ @Override
+ public void initialize(Context context) throws IOException {
+ authenticationMetrics = new
AuthenticationMetrics(context.getOpenTelemetry(),
+ getClass().getSimpleName(), getAuthMethodName());
+ var config = context.getConfig();
String domainNames;
if (config.getProperty(DOMAIN_NAME_LIST) != null) {
domainNames = (String) config.getProperty(DOMAIN_NAME_LIST);
@@ -86,6 +96,11 @@ public class AuthenticationProviderAthenz implements
AuthenticationProvider {
return "athenz";
}
+ @Override
+ public void incrementFailureMetric(Enum<?> errorCode) {
+ authenticationMetrics.recordFailure(errorCode);
+ }
+
@Override
public String authenticate(AuthenticationDataSource authData) throws
AuthenticationException {
SocketAddress clientAddress;
@@ -141,7 +156,7 @@ public class AuthenticationProviderAthenz implements
AuthenticationProvider {
if (token.validate(ztsPublicKey, allowedOffset, false, null)) {
log.debug("Athenz Role Token : {}, Authenticated for
Client: {}", roleToken, clientAddress);
-
AuthenticationMetrics.authenticateSuccess(getClass().getSimpleName(),
getAuthMethodName());
+ authenticationMetrics.recordSuccess();
return token.getPrincipal();
} else {
errorCode = ErrorCode.INVALID_TOKEN;
diff --git
a/pulsar-broker-auth-athenz/src/test/java/org/apache/pulsar/broker/authentication/AuthenticationProviderAthenzTest.java
b/pulsar-broker-auth-athenz/src/test/java/org/apache/pulsar/broker/authentication/AuthenticationProviderAthenzTest.java
index a5211c2f814..63dcd093978 100644
---
a/pulsar-broker-auth-athenz/src/test/java/org/apache/pulsar/broker/authentication/AuthenticationProviderAthenzTest.java
+++
b/pulsar-broker-auth-athenz/src/test/java/org/apache/pulsar/broker/authentication/AuthenticationProviderAthenzTest.java
@@ -20,10 +20,8 @@ package org.apache.pulsar.broker.authentication;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.fail;
-
import com.yahoo.athenz.auth.token.RoleToken;
import com.yahoo.athenz.zpe.ZpeConsts;
-
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.file.Files;
@@ -31,9 +29,7 @@ import java.nio.file.Paths;
import java.util.ArrayList;
import java.util.List;
import java.util.Properties;
-
import javax.naming.AuthenticationException;
-
import org.apache.pulsar.broker.ServiceConfiguration;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;
@@ -55,7 +51,7 @@ public class AuthenticationProviderAthenzTest {
// Initialize authentication provider
provider = new AuthenticationProviderAthenz();
- provider.initialize(config);
+
provider.initialize(AuthenticationProvider.Context.builder().config(config).build());
// Specify Athenz configuration file for AuthZpeClient which is used
in AuthenticationProviderAthenz
System.setProperty(ZpeConsts.ZPE_PROP_ATHENZ_CONF,
"./src/test/resources/athenz.conf.test");
@@ -69,7 +65,7 @@ public class AuthenticationProviderAthenzTest {
emptyConf.setProperties(emptyProp);
AuthenticationProviderAthenz sysPropProvider1 = new
AuthenticationProviderAthenz();
try {
- sysPropProvider1.initialize(emptyConf);
+
sysPropProvider1.initialize(AuthenticationProvider.Context.builder().config(emptyConf).build());
assertEquals(sysPropProvider1.getAllowedOffset(), 30); // default
allowed offset is 30 sec
} catch (Exception e) {
fail("Fail to Read pulsar.athenz.domain.names from System
Properties");
@@ -78,7 +74,7 @@ public class AuthenticationProviderAthenzTest {
System.setProperty("pulsar.athenz.role.token_allowed_offset", "0");
AuthenticationProviderAthenz sysPropProvider2 = new
AuthenticationProviderAthenz();
try {
- sysPropProvider2.initialize(config);
+
sysPropProvider2.initialize(AuthenticationProvider.Context.builder().config(config).build());
assertEquals(sysPropProvider2.getAllowedOffset(), 0);
} catch (Exception e) {
fail("Failed to get allowed offset from system property");
@@ -87,7 +83,7 @@ public class AuthenticationProviderAthenzTest {
System.setProperty("pulsar.athenz.role.token_allowed_offset",
"invalid");
AuthenticationProviderAthenz sysPropProvider3 = new
AuthenticationProviderAthenz();
try {
- sysPropProvider3.initialize(config);
+
sysPropProvider3.initialize(AuthenticationProvider.Context.builder().config(config).build());
fail("Invalid allowed offset should not be specified");
} catch (IOException e) {
}
@@ -95,7 +91,7 @@ public class AuthenticationProviderAthenzTest {
System.setProperty("pulsar.athenz.role.token_allowed_offset", "-1");
AuthenticationProviderAthenz sysPropProvider4 = new
AuthenticationProviderAthenz();
try {
- sysPropProvider4.initialize(config);
+
sysPropProvider4.initialize(AuthenticationProvider.Context.builder().config(config).build());
fail("Negative allowed offset should not be specified");
} catch (IOException e) {
}
diff --git
a/pulsar-broker-auth-oidc/src/main/java/org/apache/pulsar/broker/authentication/oidc/AuthenticationProviderOpenID.java
b/pulsar-broker-auth-oidc/src/main/java/org/apache/pulsar/broker/authentication/oidc/AuthenticationProviderOpenID.java
index a9d812c10b0..38f61809133 100644
---
a/pulsar-broker-auth-oidc/src/main/java/org/apache/pulsar/broker/authentication/oidc/AuthenticationProviderOpenID.java
+++
b/pulsar-broker-auth-oidc/src/main/java/org/apache/pulsar/broker/authentication/oidc/AuthenticationProviderOpenID.java
@@ -88,8 +88,6 @@ import org.slf4j.LoggerFactory;
public class AuthenticationProviderOpenID implements AuthenticationProvider {
private static final Logger log =
LoggerFactory.getLogger(AuthenticationProviderOpenID.class);
- private static final String SIMPLE_NAME =
AuthenticationProviderOpenID.class.getSimpleName();
-
// Must match the value used by the OAuth2 Client Plugin.
private static final String AUTH_METHOD_NAME = "token";
@@ -148,8 +146,18 @@ public class AuthenticationProviderOpenID implements
AuthenticationProvider {
private String[] allowedAudiences;
private ApiClient k8sApiClient;
+ private AuthenticationMetrics authenticationMetrics;
+
@Override
public void initialize(ServiceConfiguration config) throws IOException {
+ initialize(Context.builder().config(config).build());
+ }
+
+ @Override
+ public void initialize(Context context) throws IOException {
+ authenticationMetrics = new
AuthenticationMetrics(context.getOpenTelemetry(),
+ getClass().getSimpleName(), getAuthMethodName());
+ var config = context.getConfig();
this.allowedAudiences =
validateAllowedAudiences(getConfigValueAsSet(config, ALLOWED_AUDIENCES));
this.roleClaim = getConfigValueAsString(config, ROLE_CLAIM,
ROLE_CLAIM_DEFAULT);
this.isRoleClaimNotSubject = !ROLE_CLAIM_DEFAULT.equals(roleClaim);
@@ -181,8 +189,8 @@ public class AuthenticationProviderOpenID implements
AuthenticationProvider {
.build();
httpClient = new DefaultAsyncHttpClient(clientConfig);
k8sApiClient = fallbackDiscoveryMode != FallbackDiscoveryMode.DISABLED
? Config.defaultClient() : null;
- this.openIDProviderMetadataCache = new
OpenIDProviderMetadataCache(config, httpClient, k8sApiClient);
- this.jwksCache = new JwksCache(config, httpClient, k8sApiClient);
+ this.openIDProviderMetadataCache = new
OpenIDProviderMetadataCache(this, config, httpClient, k8sApiClient);
+ this.jwksCache = new JwksCache(this, config, httpClient, k8sApiClient);
}
@Override
@@ -190,6 +198,11 @@ public class AuthenticationProviderOpenID implements
AuthenticationProvider {
return AUTH_METHOD_NAME;
}
+ @Override
+ public void incrementFailureMetric(Enum<?> errorCode) {
+ authenticationMetrics.recordFailure(errorCode);
+ }
+
/**
* Authenticate the parameterized {@link AuthenticationDataSource} by
verifying the issuer is an allowed issuer,
* then retrieving the JWKS URI from the issuer, then retrieving the
Public key from the JWKS URI, and finally
@@ -219,7 +232,7 @@ public class AuthenticationProviderOpenID implements
AuthenticationProvider {
return authenticateToken(token)
.whenComplete((jwt, e) -> {
if (jwt != null) {
-
AuthenticationMetrics.authenticateSuccess(getClass().getSimpleName(),
getAuthMethodName());
+ authenticationMetrics.recordSuccess();
}
// Failure metrics are incremented within methods above
});
@@ -463,10 +476,6 @@ public class AuthenticationProviderOpenID implements
AuthenticationProvider {
}
}
- static void incrementFailureMetric(AuthenticationExceptionCode code) {
- AuthenticationMetrics.authenticateFailure(SIMPLE_NAME,
AUTH_METHOD_NAME, code);
- }
-
/**
* Validate the configured allow list of allowedIssuers. The
allowedIssuers set must be nonempty in order for
* the plugin to authenticate any token. Thus, it fails initialization if
the configuration is
diff --git
a/pulsar-broker-auth-oidc/src/main/java/org/apache/pulsar/broker/authentication/oidc/JwksCache.java
b/pulsar-broker-auth-oidc/src/main/java/org/apache/pulsar/broker/authentication/oidc/JwksCache.java
index 73934e9c1e0..c88661c39c6 100644
---
a/pulsar-broker-auth-oidc/src/main/java/org/apache/pulsar/broker/authentication/oidc/JwksCache.java
+++
b/pulsar-broker-auth-oidc/src/main/java/org/apache/pulsar/broker/authentication/oidc/JwksCache.java
@@ -18,6 +18,7 @@
*/
package org.apache.pulsar.broker.authentication.oidc;
+import static
org.apache.pulsar.broker.authentication.oidc.AuthenticationExceptionCode.ERROR_RETRIEVING_PUBLIC_KEY;
import static
org.apache.pulsar.broker.authentication.oidc.AuthenticationProviderOpenID.CACHE_EXPIRATION_SECONDS;
import static
org.apache.pulsar.broker.authentication.oidc.AuthenticationProviderOpenID.CACHE_EXPIRATION_SECONDS_DEFAULT;
import static
org.apache.pulsar.broker.authentication.oidc.AuthenticationProviderOpenID.CACHE_REFRESH_AFTER_WRITE_SECONDS;
@@ -26,7 +27,6 @@ import static
org.apache.pulsar.broker.authentication.oidc.AuthenticationProvide
import static
org.apache.pulsar.broker.authentication.oidc.AuthenticationProviderOpenID.CACHE_SIZE_DEFAULT;
import static
org.apache.pulsar.broker.authentication.oidc.AuthenticationProviderOpenID.KEY_ID_CACHE_MISS_REFRESH_SECONDS;
import static
org.apache.pulsar.broker.authentication.oidc.AuthenticationProviderOpenID.KEY_ID_CACHE_MISS_REFRESH_SECONDS_DEFAULT;
-import static
org.apache.pulsar.broker.authentication.oidc.AuthenticationProviderOpenID.incrementFailureMetric;
import static
org.apache.pulsar.broker.authentication.oidc.ConfigUtils.getConfigValueAsInt;
import com.auth0.jwk.Jwk;
import com.fasterxml.jackson.databind.ObjectMapper;
@@ -49,6 +49,7 @@ import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
import javax.naming.AuthenticationException;
import org.apache.pulsar.broker.ServiceConfiguration;
+import org.apache.pulsar.broker.authentication.AuthenticationProvider;
import org.asynchttpclient.AsyncHttpClient;
public class JwksCache {
@@ -60,8 +61,11 @@ public class JwksCache {
private final ObjectReader reader = new
ObjectMapper().readerFor(HashMap.class);
private final AsyncHttpClient httpClient;
private final OpenidApi openidApi;
+ private final AuthenticationProvider authenticationProvider;
- JwksCache(ServiceConfiguration config, AsyncHttpClient httpClient,
ApiClient apiClient) throws IOException {
+ JwksCache(AuthenticationProvider authenticationProvider,
ServiceConfiguration config,
+ AsyncHttpClient httpClient, ApiClient apiClient) throws
IOException {
+ this.authenticationProvider = authenticationProvider;
// Store the clients
this.httpClient = httpClient;
this.openidApi = apiClient != null ? new OpenidApi(apiClient) : null;
@@ -91,7 +95,7 @@ public class JwksCache {
CompletableFuture<Jwk> getJwk(String jwksUri, String keyId) {
if (jwksUri == null) {
-
incrementFailureMetric(AuthenticationExceptionCode.ERROR_RETRIEVING_PUBLIC_KEY);
+
authenticationProvider.incrementFailureMetric(ERROR_RETRIEVING_PUBLIC_KEY);
return CompletableFuture.failedFuture(new
IllegalArgumentException("jwksUri must not be null."));
}
return getJwkAndMaybeReload(Optional.of(jwksUri), keyId, false);
@@ -139,10 +143,10 @@ public class JwksCache {
reader.readValue(result.getResponseBodyAsBytes());
future.complete(convertToJwks(jwksUri, jwks));
} catch (AuthenticationException e) {
-
incrementFailureMetric(AuthenticationExceptionCode.ERROR_RETRIEVING_PUBLIC_KEY);
+
authenticationProvider.incrementFailureMetric(ERROR_RETRIEVING_PUBLIC_KEY);
future.completeExceptionally(e);
} catch (Exception e) {
-
incrementFailureMetric(AuthenticationExceptionCode.ERROR_RETRIEVING_PUBLIC_KEY);
+
authenticationProvider.incrementFailureMetric(ERROR_RETRIEVING_PUBLIC_KEY);
future.completeExceptionally(new
AuthenticationException(
"Error retrieving public key at " + jwksUri +
": " + e.getMessage()));
}
@@ -152,7 +156,7 @@ public class JwksCache {
CompletableFuture<Jwk> getJwkFromKubernetesApiServer(String keyId) {
if (openidApi == null) {
-
incrementFailureMetric(AuthenticationExceptionCode.ERROR_RETRIEVING_PUBLIC_KEY);
+
authenticationProvider.incrementFailureMetric(ERROR_RETRIEVING_PUBLIC_KEY);
return CompletableFuture.failedFuture(new AuthenticationException(
"Failed to retrieve public key from Kubernetes API server:
Kubernetes fallback is not enabled."));
}
@@ -165,7 +169,7 @@ public class JwksCache {
openidApi.getServiceAccountIssuerOpenIDKeysetAsync(new
ApiCallback<String>() {
@Override
public void onFailure(ApiException e, int statusCode,
Map<String, List<String>> responseHeaders) {
-
incrementFailureMetric(AuthenticationExceptionCode.ERROR_RETRIEVING_PUBLIC_KEY);
+
authenticationProvider.incrementFailureMetric(ERROR_RETRIEVING_PUBLIC_KEY);
// We want the message and responseBody here:
https://github.com/kubernetes-client/java/issues/2066.
future.completeExceptionally(
new AuthenticationException("Failed to retrieve
public key from Kubernetes API server. "
@@ -178,10 +182,10 @@ public class JwksCache {
HashMap<String, Object> jwks =
reader.readValue(result);
future.complete(convertToJwks("Kubernetes API server",
jwks));
} catch (AuthenticationException e) {
-
incrementFailureMetric(AuthenticationExceptionCode.ERROR_RETRIEVING_PUBLIC_KEY);
+
authenticationProvider.incrementFailureMetric(ERROR_RETRIEVING_PUBLIC_KEY);
future.completeExceptionally(e);
} catch (Exception e) {
-
incrementFailureMetric(AuthenticationExceptionCode.ERROR_RETRIEVING_PUBLIC_KEY);
+
authenticationProvider.incrementFailureMetric(ERROR_RETRIEVING_PUBLIC_KEY);
future.completeExceptionally(new
AuthenticationException(
"Error retrieving public key at Kubernetes API
server: " + e.getMessage()));
}
@@ -198,7 +202,7 @@ public class JwksCache {
}
});
} catch (ApiException e) {
-
incrementFailureMetric(AuthenticationExceptionCode.ERROR_RETRIEVING_PUBLIC_KEY);
+
authenticationProvider.incrementFailureMetric(ERROR_RETRIEVING_PUBLIC_KEY);
future.completeExceptionally(
new AuthenticationException("Failed to retrieve public key
from Kubernetes API server: "
+ e.getMessage()));
@@ -212,7 +216,7 @@ public class JwksCache {
return jwk;
}
}
-
incrementFailureMetric(AuthenticationExceptionCode.ERROR_RETRIEVING_PUBLIC_KEY);
+
authenticationProvider.incrementFailureMetric(ERROR_RETRIEVING_PUBLIC_KEY);
throw new IllegalArgumentException("No JWK found for Key ID " + keyId);
}
diff --git
a/pulsar-broker-auth-oidc/src/main/java/org/apache/pulsar/broker/authentication/oidc/OpenIDProviderMetadataCache.java
b/pulsar-broker-auth-oidc/src/main/java/org/apache/pulsar/broker/authentication/oidc/OpenIDProviderMetadataCache.java
index 111399adbd7..cffa52b00aa 100644
---
a/pulsar-broker-auth-oidc/src/main/java/org/apache/pulsar/broker/authentication/oidc/OpenIDProviderMetadataCache.java
+++
b/pulsar-broker-auth-oidc/src/main/java/org/apache/pulsar/broker/authentication/oidc/OpenIDProviderMetadataCache.java
@@ -18,13 +18,13 @@
*/
package org.apache.pulsar.broker.authentication.oidc;
+import static
org.apache.pulsar.broker.authentication.oidc.AuthenticationExceptionCode.ERROR_RETRIEVING_PROVIDER_METADATA;
import static
org.apache.pulsar.broker.authentication.oidc.AuthenticationProviderOpenID.CACHE_EXPIRATION_SECONDS;
import static
org.apache.pulsar.broker.authentication.oidc.AuthenticationProviderOpenID.CACHE_EXPIRATION_SECONDS_DEFAULT;
import static
org.apache.pulsar.broker.authentication.oidc.AuthenticationProviderOpenID.CACHE_REFRESH_AFTER_WRITE_SECONDS;
import static
org.apache.pulsar.broker.authentication.oidc.AuthenticationProviderOpenID.CACHE_REFRESH_AFTER_WRITE_SECONDS_DEFAULT;
import static
org.apache.pulsar.broker.authentication.oidc.AuthenticationProviderOpenID.CACHE_SIZE;
import static
org.apache.pulsar.broker.authentication.oidc.AuthenticationProviderOpenID.CACHE_SIZE_DEFAULT;
-import static
org.apache.pulsar.broker.authentication.oidc.AuthenticationProviderOpenID.incrementFailureMetric;
import static
org.apache.pulsar.broker.authentication.oidc.ConfigUtils.getConfigValueAsInt;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.ObjectReader;
@@ -43,6 +43,7 @@ import java.util.concurrent.TimeUnit;
import javax.annotation.Nonnull;
import javax.naming.AuthenticationException;
import org.apache.pulsar.broker.ServiceConfiguration;
+import org.apache.pulsar.broker.authentication.AuthenticationProvider;
import org.asynchttpclient.AsyncHttpClient;
/**
@@ -51,13 +52,16 @@ import org.asynchttpclient.AsyncHttpClient;
class OpenIDProviderMetadataCache {
private final ObjectReader reader = new
ObjectMapper().readerFor(OpenIDProviderMetadata.class);
+ private final AuthenticationProvider authenticationProvider;
private final AsyncHttpClient httpClient;
private final WellKnownApi wellKnownApi;
private final AsyncLoadingCache<Optional<String>, OpenIDProviderMetadata>
cache;
private static final String WELL_KNOWN_OPENID_CONFIG =
".well-known/openid-configuration";
private static final String SLASH_WELL_KNOWN_OPENID_CONFIG = "/" +
WELL_KNOWN_OPENID_CONFIG;
- OpenIDProviderMetadataCache(ServiceConfiguration config, AsyncHttpClient
httpClient, ApiClient apiClient) {
+ OpenIDProviderMetadataCache(AuthenticationProvider authenticationProvider,
ServiceConfiguration config,
+ AsyncHttpClient httpClient, ApiClient
apiClient) {
+ this.authenticationProvider = authenticationProvider;
int maxSize = getConfigValueAsInt(config, CACHE_SIZE,
CACHE_SIZE_DEFAULT);
int refreshAfterWriteSeconds = getConfigValueAsInt(config,
CACHE_REFRESH_AFTER_WRITE_SECONDS,
CACHE_REFRESH_AFTER_WRITE_SECONDS_DEFAULT);
@@ -124,10 +128,10 @@ class OpenIDProviderMetadataCache {
verifyIssuer(issuer, openIDProviderMetadata, false);
future.complete(openIDProviderMetadata);
} catch (AuthenticationException e) {
-
incrementFailureMetric(AuthenticationExceptionCode.ERROR_RETRIEVING_PROVIDER_METADATA);
+
authenticationProvider.incrementFailureMetric(ERROR_RETRIEVING_PROVIDER_METADATA);
future.completeExceptionally(e);
} catch (Exception e) {
-
incrementFailureMetric(AuthenticationExceptionCode.ERROR_RETRIEVING_PROVIDER_METADATA);
+
authenticationProvider.incrementFailureMetric(ERROR_RETRIEVING_PROVIDER_METADATA);
future.completeExceptionally(new
AuthenticationException(
"Error retrieving OpenID Provider Metadata at
" + issuer + ": " + e.getMessage()));
}
@@ -151,7 +155,7 @@ class OpenIDProviderMetadataCache {
verifyIssuer(issClaim, openIDProviderMetadata, true);
future.complete(openIDProviderMetadata);
} catch (AuthenticationException e) {
-
incrementFailureMetric(AuthenticationExceptionCode.ERROR_RETRIEVING_PROVIDER_METADATA);
+
authenticationProvider.incrementFailureMetric(ERROR_RETRIEVING_PROVIDER_METADATA);
future.completeExceptionally(e);
}
return future;
@@ -164,7 +168,7 @@ class OpenIDProviderMetadataCache {
wellKnownApi.getServiceAccountIssuerOpenIDConfigurationAsync(new
ApiCallback<>() {
@Override
public void onFailure(ApiException e, int statusCode,
Map<String, List<String>> responseHeaders) {
-
incrementFailureMetric(AuthenticationExceptionCode.ERROR_RETRIEVING_PROVIDER_METADATA);
+
authenticationProvider.incrementFailureMetric(ERROR_RETRIEVING_PROVIDER_METADATA);
// We want the message and responseBody here:
https://github.com/kubernetes-client/java/issues/2066.
future.completeExceptionally(new AuthenticationException(
"Error retrieving OpenID Provider Metadata from
Kubernetes API server. Message: "
@@ -179,7 +183,7 @@ class OpenIDProviderMetadataCache {
OpenIDProviderMetadata openIDProviderMetadata =
reader.readValue(result);
future.complete(openIDProviderMetadata);
} catch (Exception e) {
-
incrementFailureMetric(AuthenticationExceptionCode.ERROR_RETRIEVING_PROVIDER_METADATA);
+
authenticationProvider.incrementFailureMetric(ERROR_RETRIEVING_PROVIDER_METADATA);
future.completeExceptionally(new
AuthenticationException(
"Error retrieving OpenID Provider Metadata
from Kubernetes API Server: "
+ e.getMessage()));
@@ -197,7 +201,7 @@ class OpenIDProviderMetadataCache {
}
});
} catch (ApiException e) {
-
incrementFailureMetric(AuthenticationExceptionCode.ERROR_RETRIEVING_PROVIDER_METADATA);
+
authenticationProvider.incrementFailureMetric(ERROR_RETRIEVING_PROVIDER_METADATA);
future.completeExceptionally(new AuthenticationException(
"Error retrieving OpenID Provider Metadata from Kubernetes
API server: " + e.getMessage()));
}
@@ -221,10 +225,10 @@ class OpenIDProviderMetadataCache {
boolean isK8s) throws AuthenticationException {
if (!issuer.equals(metadata.getIssuer())) {
if (isK8s) {
-
incrementFailureMetric(AuthenticationExceptionCode.UNSUPPORTED_ISSUER);
+
authenticationProvider.incrementFailureMetric(AuthenticationExceptionCode.UNSUPPORTED_ISSUER);
throw new AuthenticationException("Issuer not allowed: " +
issuer);
} else {
-
incrementFailureMetric(AuthenticationExceptionCode.ISSUER_MISMATCH);
+
authenticationProvider.incrementFailureMetric(AuthenticationExceptionCode.ISSUER_MISMATCH);
throw new AuthenticationException(String.format("Issuer URL
mismatch: [%s] should match [%s]",
issuer, metadata.getIssuer()));
}
diff --git
a/pulsar-broker-auth-oidc/src/test/java/org/apache/pulsar/broker/authentication/oidc/AuthenticationProviderOpenIDIntegrationTest.java
b/pulsar-broker-auth-oidc/src/test/java/org/apache/pulsar/broker/authentication/oidc/AuthenticationProviderOpenIDIntegrationTest.java
index e11fd8395a5..f4663a9ee3c 100644
---
a/pulsar-broker-auth-oidc/src/test/java/org/apache/pulsar/broker/authentication/oidc/AuthenticationProviderOpenIDIntegrationTest.java
+++
b/pulsar-broker-auth-oidc/src/test/java/org/apache/pulsar/broker/authentication/oidc/AuthenticationProviderOpenIDIntegrationTest.java
@@ -260,7 +260,7 @@ public class AuthenticationProviderOpenIDIntegrationTest {
Files.write(Path.of(System.getenv("KUBECONFIG")),
kubeConfig.getBytes());
provider = new AuthenticationProviderOpenID();
- provider.initialize(conf);
+
provider.initialize(AuthenticationProvider.Context.builder().config(conf).build());
}
@AfterClass
@@ -358,7 +358,7 @@ public class AuthenticationProviderOpenIDIntegrationTest {
@Cleanup
AuthenticationProviderOpenID provider = new
AuthenticationProviderOpenID();
- provider.initialize(conf);
+
provider.initialize(AuthenticationProvider.Context.builder().config(conf).build());
String role = "superuser";
String token = generateToken(validJwk, issuerWithMissingKid, role,
"allowed-audience", 0L, 0L, 10000L);
@@ -379,7 +379,7 @@ public class AuthenticationProviderOpenIDIntegrationTest {
@Cleanup
AuthenticationProviderOpenID provider = new
AuthenticationProviderOpenID();
- provider.initialize(conf);
+
provider.initialize(AuthenticationProvider.Context.builder().config(conf).build());
String role = "superuser";
String token = generateToken(validJwk, issuerWithMissingKid, role,
"allowed-audience", 0L, 0L, 10000L);
@@ -407,7 +407,7 @@ public class AuthenticationProviderOpenIDIntegrationTest {
@Cleanup
AuthenticationProviderOpenID provider = new
AuthenticationProviderOpenID();
- provider.initialize(conf);
+
provider.initialize(AuthenticationProvider.Context.builder().config(conf).build());
String role = "superuser";
// We use the normal issuer on the token because the /k8s endpoint is
configured via the kube config file
@@ -441,7 +441,7 @@ public class AuthenticationProviderOpenIDIntegrationTest {
@Cleanup
AuthenticationProviderOpenID provider = new
AuthenticationProviderOpenID();
- provider.initialize(conf);
+
provider.initialize(AuthenticationProvider.Context.builder().config(conf).build());
String role = "superuser";
String token = generateToken(validJwk, "http://not-the-k8s-issuer",
role, "allowed-audience", 0L, 0L, 10000L);
@@ -468,7 +468,7 @@ public class AuthenticationProviderOpenIDIntegrationTest {
@Cleanup
AuthenticationProviderOpenID provider = new
AuthenticationProviderOpenID();
- provider.initialize(conf);
+
provider.initialize(AuthenticationProvider.Context.builder().config(conf).build());
String role = "superuser";
String token = generateToken(validJwk, issuer, role,
"allowed-audience", 0L, 0L, 10000L);
@@ -499,7 +499,7 @@ public class AuthenticationProviderOpenIDIntegrationTest {
@Cleanup
AuthenticationProviderOpenID provider = new
AuthenticationProviderOpenID();
- provider.initialize(conf);
+
provider.initialize(AuthenticationProvider.Context.builder().config(conf).build());
String role = "superuser";
String token = generateToken(validJwk, "http://not-the-k8s-issuer",
role, "allowed-audience", 0L, 0L, 10000L);
@@ -562,7 +562,7 @@ public class AuthenticationProviderOpenIDIntegrationTest {
props.setProperty(AuthenticationProviderOpenID.ACCEPTED_TIME_LEEWAY_SECONDS,
"10");
@Cleanup
AuthenticationProviderOpenID provider = new
AuthenticationProviderOpenID();
- provider.initialize(conf);
+
provider.initialize(AuthenticationProvider.Context.builder().config(conf).build());
String role = "superuser";
String token = generateToken(validJwk, issuer, role,
"allowed-audience", 0L, 0L, 0L);
@@ -635,7 +635,7 @@ public class AuthenticationProviderOpenIDIntegrationTest {
props.setProperty(AuthenticationProviderOpenID.ISSUER_TRUST_CERTS_FILE_PATH,
caCert);
ServiceConfiguration config = new ServiceConfiguration();
config.setProperties(props);
- provider.initialize(config);
+
provider.initialize(AuthenticationProvider.Context.builder().config(config).build());
// Build a JWT with a custom claim
HashMap<String, Object> claims = new HashMap();
@@ -656,7 +656,7 @@ public class AuthenticationProviderOpenIDIntegrationTest {
props.setProperty(AuthenticationProviderOpenID.ISSUER_TRUST_CERTS_FILE_PATH,
caCert);
ServiceConfiguration config = new ServiceConfiguration();
config.setProperties(props);
- provider.initialize(config);
+
provider.initialize(AuthenticationProvider.Context.builder().config(config).build());
// Build a JWT without the "test" claim, which should cause the
authentication to fail
String token = generateToken(validJwk, issuer, "not-my-role",
"allowed-audience", 0L,
diff --git
a/pulsar-broker-auth-oidc/src/test/java/org/apache/pulsar/broker/authentication/oidc/AuthenticationProviderOpenIDTest.java
b/pulsar-broker-auth-oidc/src/test/java/org/apache/pulsar/broker/authentication/oidc/AuthenticationProviderOpenIDTest.java
index f5bb584d16f..4a12f61528a 100644
---
a/pulsar-broker-auth-oidc/src/test/java/org/apache/pulsar/broker/authentication/oidc/AuthenticationProviderOpenIDTest.java
+++
b/pulsar-broker-auth-oidc/src/test/java/org/apache/pulsar/broker/authentication/oidc/AuthenticationProviderOpenIDTest.java
@@ -18,6 +18,7 @@
*/
package org.apache.pulsar.broker.authentication.oidc;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
import static org.testng.Assert.assertNull;
import com.auth0.jwt.JWT;
import com.auth0.jwt.interfaces.DecodedJWT;
@@ -29,9 +30,9 @@ import java.security.KeyPair;
import java.sql.Date;
import java.time.Instant;
import java.util.Arrays;
+import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
-import java.util.HashSet;
import java.util.List;
import java.util.Properties;
import java.util.Set;
@@ -39,6 +40,7 @@ import javax.naming.AuthenticationException;
import lombok.Cleanup;
import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.broker.authentication.AuthenticationDataCommand;
+import org.apache.pulsar.broker.authentication.AuthenticationProvider;
import org.testng.Assert;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
@@ -59,16 +61,31 @@ import org.testng.annotations.Test;
public class AuthenticationProviderOpenIDTest {
// https://www.rfc-editor.org/rfc/rfc7518#section-3.1
+ private static final Set<SignatureAlgorithm> SUPPORTED_ALGORITHMS = Set.of(
+ SignatureAlgorithm.RS256,
+ SignatureAlgorithm.RS384,
+ SignatureAlgorithm.RS512,
+ SignatureAlgorithm.ES256,
+ SignatureAlgorithm.ES384,
+ SignatureAlgorithm.ES512
+ );
+
@DataProvider(name = "supportedAlgorithms")
public static Object[][] supportedAlgorithms() {
- return new Object[][] {
- { SignatureAlgorithm.RS256 },
- { SignatureAlgorithm.RS384 },
- { SignatureAlgorithm.RS512 },
- { SignatureAlgorithm.ES256 },
- { SignatureAlgorithm.ES384 },
- { SignatureAlgorithm.ES512 }
- };
+ return buildDataProvider(SUPPORTED_ALGORITHMS);
+ }
+
+ @DataProvider(name = "unsupportedAlgorithms")
+ public static Object[][] unsupportedAlgorithms() {
+ var unsupportedAlgorithms = Set.of(SignatureAlgorithm.values())
+ .stream()
+ .filter(alg -> !SUPPORTED_ALGORITHMS.contains(alg))
+ .toList();
+ return buildDataProvider(unsupportedAlgorithms);
+ }
+
+ private static Object[][] buildDataProvider(Collection<?> collection) {
+ return collection.stream().map(o -> new Object[] { o
}).toArray(Object[][]::new);
}
// Provider to use in common tests that are not verifying the
configuration of the provider itself.
@@ -83,7 +100,7 @@ public class AuthenticationProviderOpenIDTest {
ServiceConfiguration conf = new ServiceConfiguration();
conf.setProperties(properties);
basicProvider = new AuthenticationProviderOpenID();
- basicProvider.initialize(conf);
+
basicProvider.initialize(AuthenticationProvider.Context.builder().config(conf).build());
}
@AfterClass
@@ -100,29 +117,19 @@ public class AuthenticationProviderOpenIDTest {
}
@Test
- public void testThatNullAlgFails() throws IOException {
- @Cleanup
- AuthenticationProviderOpenID provider = new
AuthenticationProviderOpenID();
- Assert.assertThrows(AuthenticationException.class,
- () -> provider.verifyJWT(null, null, null));
+ public void testThatNullAlgFails() {
+ assertThatThrownBy(() -> basicProvider.verifyJWT(null, null, null))
+ .isInstanceOf(AuthenticationException.class)
+ .hasMessage("PublicKey algorithm cannot be null");
}
- @Test
- public void testThatUnsupportedAlgsThrowExceptions() {
- Set<SignatureAlgorithm> unsupportedAlgs = new
HashSet<>(Set.of(SignatureAlgorithm.values()));
- Arrays.stream(supportedAlgorithms()).map(o -> (SignatureAlgorithm)
o[0]).toList()
- .forEach(unsupportedAlgs::remove);
- unsupportedAlgs.forEach(unsupportedAlg -> {
- try {
- @Cleanup
- AuthenticationProviderOpenID provider = new
AuthenticationProviderOpenID();
- // We don't create a public key because it's irrelevant
- Assert.assertThrows(AuthenticationException.class,
- () -> provider.verifyJWT(null,
unsupportedAlg.getValue(), null));
- } catch (Exception e) {
- throw new RuntimeException(e);
- }
- });
+ @Test(dataProvider = "unsupportedAlgorithms")
+ public void testThatUnsupportedAlgsThrowExceptions(SignatureAlgorithm
unsupportedAlg) {
+ var algorithm = unsupportedAlg.getValue();
+ // We don't create a public key because it's irrelevant
+ assertThatThrownBy(() -> basicProvider.verifyJWT(null, algorithm,
null))
+ .isInstanceOf(AuthenticationException.class)
+ .hasMessage("Unsupported algorithm: " + algorithm);
}
@Test(dataProvider = "supportedAlgorithms")
@@ -141,29 +148,27 @@ public class AuthenticationProviderOpenIDTest {
@Test
public void
testThatSupportedAlgWithMismatchedPublicKeyFromDifferentAlgFamilyFails() throws
IOException {
KeyPair keyPair = Keys.keyPairFor(SignatureAlgorithm.RS256);
- @Cleanup
- AuthenticationProviderOpenID provider = new
AuthenticationProviderOpenID();
DefaultJwtBuilder defaultJwtBuilder = new DefaultJwtBuilder();
addValidMandatoryClaims(defaultJwtBuilder, basicProviderAudience);
defaultJwtBuilder.signWith(keyPair.getPrivate());
DecodedJWT jwt = JWT.decode(defaultJwtBuilder.compact());
// Choose a different algorithm from a different alg family
- Assert.assertThrows(AuthenticationException.class,
- () -> provider.verifyJWT(keyPair.getPublic(),
SignatureAlgorithm.ES512.getValue(), jwt));
+ assertThatThrownBy(() -> basicProvider.verifyJWT(keyPair.getPublic(),
SignatureAlgorithm.ES512.getValue(), jwt))
+ .isInstanceOf(AuthenticationException.class)
+ .hasMessage("Expected PublicKey alg [ES512] does match actual
alg.");
}
@Test
- public void
testThatSupportedAlgWithMismatchedPublicKeyFromSameAlgFamilyFails() throws
IOException {
+ public void
testThatSupportedAlgWithMismatchedPublicKeyFromSameAlgFamilyFails() {
KeyPair keyPair = Keys.keyPairFor(SignatureAlgorithm.RS256);
- @Cleanup
- AuthenticationProviderOpenID provider = new
AuthenticationProviderOpenID();
DefaultJwtBuilder defaultJwtBuilder = new DefaultJwtBuilder();
addValidMandatoryClaims(defaultJwtBuilder, basicProviderAudience);
defaultJwtBuilder.signWith(keyPair.getPrivate());
DecodedJWT jwt = JWT.decode(defaultJwtBuilder.compact());
// Choose a different algorithm but within the same alg family as above
- Assert.assertThrows(AuthenticationException.class,
- () -> provider.verifyJWT(keyPair.getPublic(),
SignatureAlgorithm.RS512.getValue(), jwt));
+ assertThatThrownBy(() -> basicProvider.verifyJWT(keyPair.getPublic(),
SignatureAlgorithm.RS512.getValue(), jwt))
+ .isInstanceOf(AuthenticationException.class)
+ .hasMessageStartingWith("JWT algorithm does not match Public
Key algorithm");
}
@Test
@@ -217,7 +222,7 @@ public class AuthenticationProviderOpenIDTest {
props.setProperty(AuthenticationProviderOpenID.ALLOWED_TOKEN_ISSUERS,
"https://localhost:8080");
ServiceConfiguration config = new ServiceConfiguration();
config.setProperties(props);
- provider.initialize(config);
+
provider.initialize(AuthenticationProvider.Context.builder().config(config).build());
// Build the JWT with an only recently expired token
DefaultJwtBuilder defaultJwtBuilder = new DefaultJwtBuilder();
@@ -244,7 +249,8 @@ public class AuthenticationProviderOpenIDTest {
props.setProperty(AuthenticationProviderOpenID.ALLOWED_TOKEN_ISSUERS,
"");
ServiceConfiguration config = new ServiceConfiguration();
config.setProperties(props);
- Assert.assertThrows(IllegalArgumentException.class, () ->
provider.initialize(config));
+ Assert.assertThrows(IllegalArgumentException.class,
+ () ->
provider.initialize(AuthenticationProvider.Context.builder().config(config).build()));
}
@Test
@@ -256,7 +262,8 @@ public class AuthenticationProviderOpenIDTest {
props.setProperty(AuthenticationProviderOpenID.FALLBACK_DISCOVERY_MODE,
"DISABLED");
ServiceConfiguration config = new ServiceConfiguration();
config.setProperties(props);
- Assert.assertThrows(IllegalArgumentException.class, () ->
provider.initialize(config));
+ Assert.assertThrows(IllegalArgumentException.class,
+ () ->
provider.initialize(AuthenticationProvider.Context.builder().config(config).build()));
}
@Test
@@ -269,7 +276,7 @@ public class AuthenticationProviderOpenIDTest {
props.setProperty(AuthenticationProviderOpenID.FALLBACK_DISCOVERY_MODE,
"KUBERNETES_DISCOVER_TRUSTED_ISSUER");
ServiceConfiguration config = new ServiceConfiguration();
config.setProperties(props);
- provider.initialize(config);
+
provider.initialize(AuthenticationProvider.Context.builder().config(config).build());
}
@Test
@@ -282,7 +289,7 @@ public class AuthenticationProviderOpenIDTest {
props.setProperty(AuthenticationProviderOpenID.FALLBACK_DISCOVERY_MODE,
"KUBERNETES_DISCOVER_PUBLIC_KEYS");
ServiceConfiguration config = new ServiceConfiguration();
config.setProperties(props);
- provider.initialize(config);
+
provider.initialize(AuthenticationProvider.Context.builder().config(config).build());
}
@Test
@@ -292,7 +299,8 @@ public class AuthenticationProviderOpenIDTest {
ServiceConfiguration config = new ServiceConfiguration();
// Make sure this still defaults to null.
assertNull(config.getProperties().get(AuthenticationProviderOpenID.ALLOWED_TOKEN_ISSUERS));
- Assert.assertThrows(IllegalArgumentException.class, () ->
provider.initialize(config));
+ Assert.assertThrows(IllegalArgumentException.class,
+ () ->
provider.initialize(AuthenticationProvider.Context.builder().config(config).build()));
}
@Test
@@ -303,7 +311,8 @@ public class AuthenticationProviderOpenIDTest {
props.setProperty(AuthenticationProviderOpenID.ALLOWED_TOKEN_ISSUERS,
"https://myissuer.com,http://myissuer.com");
ServiceConfiguration config = new ServiceConfiguration();
config.setProperties(props);
- Assert.assertThrows(IllegalArgumentException.class, () ->
provider.initialize(config));
+ Assert.assertThrows(IllegalArgumentException.class,
+ () ->
provider.initialize(AuthenticationProvider.Context.builder().config(config).build()));
}
@Test void ensureMissingRoleClaimReturnsNull() throws Exception {
@@ -325,7 +334,7 @@ public class AuthenticationProviderOpenIDTest {
props.setProperty(AuthenticationProviderOpenID.ROLE_CLAIM, "sub");
ServiceConfiguration config = new ServiceConfiguration();
config.setProperties(props);
- provider.initialize(config);
+
provider.initialize(AuthenticationProvider.Context.builder().config(config).build());
// Build an empty JWT
DefaultJwtBuilder defaultJwtBuilder = new DefaultJwtBuilder();
@@ -345,7 +354,7 @@ public class AuthenticationProviderOpenIDTest {
props.setProperty(AuthenticationProviderOpenID.ROLE_CLAIM, "roles");
ServiceConfiguration config = new ServiceConfiguration();
config.setProperties(props);
- provider.initialize(config);
+
provider.initialize(AuthenticationProvider.Context.builder().config(config).build());
// Build an empty JWT
DefaultJwtBuilder defaultJwtBuilder = new DefaultJwtBuilder();
@@ -367,7 +376,7 @@ public class AuthenticationProviderOpenIDTest {
props.setProperty(AuthenticationProviderOpenID.ROLE_CLAIM, "roles");
ServiceConfiguration config = new ServiceConfiguration();
config.setProperties(props);
- provider.initialize(config);
+
provider.initialize(AuthenticationProvider.Context.builder().config(config).build());
// Build an empty JWT
DefaultJwtBuilder defaultJwtBuilder = new DefaultJwtBuilder();
@@ -389,7 +398,7 @@ public class AuthenticationProviderOpenIDTest {
props.setProperty(AuthenticationProviderOpenID.ROLE_CLAIM, "roles");
ServiceConfiguration config = new ServiceConfiguration();
config.setProperties(props);
- provider.initialize(config);
+
provider.initialize(AuthenticationProvider.Context.builder().config(config).build());
// Build an empty JWT
DefaultJwtBuilder defaultJwtBuilder = new DefaultJwtBuilder();
diff --git
a/pulsar-broker-auth-sasl/src/main/java/org/apache/pulsar/broker/authentication/AuthenticationProviderSasl.java
b/pulsar-broker-auth-sasl/src/main/java/org/apache/pulsar/broker/authentication/AuthenticationProviderSasl.java
index 2616f90c664..f8841193ba2 100644
---
a/pulsar-broker-auth-sasl/src/main/java/org/apache/pulsar/broker/authentication/AuthenticationProviderSasl.java
+++
b/pulsar-broker-auth-sasl/src/main/java/org/apache/pulsar/broker/authentication/AuthenticationProviderSasl.java
@@ -78,6 +78,12 @@ public class AuthenticationProviderSasl implements
AuthenticationProvider {
@Override
public void initialize(ServiceConfiguration config) throws IOException {
+ initialize(Context.builder().config(config).build());
+ }
+
+ @Override
+ public void initialize(Context context) throws IOException {
+ var config = context.getConfig();
this.configuration = new HashMap<>();
final String allowedIdsPatternRegExp =
config.getSaslJaasClientAllowedIds();
configuration.put(JAAS_CLIENT_ALLOWED_IDS, allowedIdsPatternRegExp);
diff --git
a/pulsar-broker-auth-sasl/src/test/java/org/apache/pulsar/broker/authentication/SaslAuthenticateTest.java
b/pulsar-broker-auth-sasl/src/test/java/org/apache/pulsar/broker/authentication/SaslAuthenticateTest.java
index ae282a49dc3..226ec15d33a 100644
---
a/pulsar-broker-auth-sasl/src/test/java/org/apache/pulsar/broker/authentication/SaslAuthenticateTest.java
+++
b/pulsar-broker-auth-sasl/src/test/java/org/apache/pulsar/broker/authentication/SaslAuthenticateTest.java
@@ -312,7 +312,7 @@ public class SaslAuthenticateTest extends
ProducerConsumerBase {
AuthenticationProviderSasl saslServer = new
AuthenticationProviderSasl();
// The cache expiration time is set to 50ms. Residual auth info should
be cleaned up
conf.setInflightSaslContextExpiryMs(50);
- saslServer.initialize(conf);
+
saslServer.initialize(AuthenticationProvider.Context.builder().config(conf).build());
HttpServletRequest servletRequest = mock(HttpServletRequest.class);
doReturn("Init").when(servletRequest).getHeader("State");
@@ -360,7 +360,7 @@ public class SaslAuthenticateTest extends
ProducerConsumerBase {
doReturn("Init").when(servletRequest).getHeader("State");
conf.setInflightSaslContextExpiryMs(Integer.MAX_VALUE);
conf.setMaxInflightSaslContext(1);
- saslServer.initialize(conf);
+
saslServer.initialize(AuthenticationProvider.Context.builder().config(conf).build());
// add 10 inflight sasl context
for (int i = 0; i < 10; i++) {
AuthenticationDataProvider dataProvider =
authSasl.getAuthData("localhost");
diff --git a/pulsar-broker-common/pom.xml b/pulsar-broker-common/pom.xml
index 713ae538d7d..b04d08c6c8f 100644
--- a/pulsar-broker-common/pom.xml
+++ b/pulsar-broker-common/pom.xml
@@ -49,6 +49,11 @@
<artifactId>simpleclient_jetty</artifactId>
</dependency>
+ <dependency>
+ <groupId>io.opentelemetry</groupId>
+ <artifactId>opentelemetry-api</artifactId>
+ </dependency>
+
<dependency>
<groupId>javax.servlet</groupId>
<artifactId>javax.servlet-api</artifactId>
diff --git
a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authentication/AuthenticationProvider.java
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authentication/AuthenticationProvider.java
index 7862a35b5e8..d0a3a487b34 100644
---
a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authentication/AuthenticationProvider.java
+++
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authentication/AuthenticationProvider.java
@@ -20,6 +20,7 @@ package org.apache.pulsar.broker.authentication;
import static
org.apache.pulsar.broker.web.AuthenticationFilter.AuthenticatedDataAttributeName;
import static
org.apache.pulsar.broker.web.AuthenticationFilter.AuthenticatedRoleAttributeName;
+import io.opentelemetry.api.OpenTelemetry;
import java.io.Closeable;
import java.io.IOException;
import java.net.SocketAddress;
@@ -29,6 +30,8 @@ import javax.naming.AuthenticationException;
import javax.net.ssl.SSLSession;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
+import lombok.Builder;
+import lombok.Value;
import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.broker.authentication.metrics.AuthenticationMetrics;
import org.apache.pulsar.common.api.AuthData;
@@ -47,8 +50,30 @@ public interface AuthenticationProvider extends Closeable {
* @throws IOException
* if the initialization fails
*/
+ @Deprecated(since = "3.4.0")
void initialize(ServiceConfiguration config) throws IOException;
+ @Builder
+ @Value
+ class Context {
+ ServiceConfiguration config;
+
+ @Builder.Default
+ OpenTelemetry openTelemetry = OpenTelemetry.noop();
+ }
+
+ /**
+ * Perform initialization for the authentication provider.
+ *
+ * @param context
+ * the authentication provider context
+ * @throws IOException
+ * if the initialization fails
+ */
+ default void initialize(Context context) throws IOException {
+ initialize(context.getConfig());
+ }
+
/**
* @return the authentication method name supported by this provider
*/
diff --git
a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authentication/AuthenticationProviderBasic.java
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authentication/AuthenticationProviderBasic.java
index ca5150c9bdb..91bf56a071c 100644
---
a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authentication/AuthenticationProviderBasic.java
+++
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authentication/AuthenticationProviderBasic.java
@@ -46,6 +46,8 @@ public class AuthenticationProviderBasic implements
AuthenticationProvider {
private static final String CONF_PULSAR_PROPERTY_KEY = "basicAuthConf";
private Map<String, String> users;
+ private AuthenticationMetrics authenticationMetrics;
+
private enum ErrorCode {
UNKNOWN,
EMPTY_AUTH_DATA,
@@ -75,6 +77,14 @@ public class AuthenticationProviderBasic implements
AuthenticationProvider {
@Override
public void initialize(ServiceConfiguration config) throws IOException {
+ initialize(Context.builder().config(config).build());
+ }
+
+ @Override
+ public void initialize(Context context) throws IOException {
+ authenticationMetrics = new
AuthenticationMetrics(context.getOpenTelemetry(),
+ getClass().getSimpleName(), getAuthMethodName());
+ var config = context.getConfig();
String data =
config.getProperties().getProperty(CONF_PULSAR_PROPERTY_KEY);
if (StringUtils.isEmpty(data)) {
data = System.getProperty(CONF_SYSTEM_PROPERTY_KEY);
@@ -106,6 +116,11 @@ public class AuthenticationProviderBasic implements
AuthenticationProvider {
return "basic";
}
+ @Override
+ public void incrementFailureMetric(Enum<?> errorCode) {
+ authenticationMetrics.recordFailure(errorCode);
+ }
+
@Override
public String authenticate(AuthenticationDataSource authData) throws
AuthenticationException {
AuthParams authParams = new AuthParams(authData);
@@ -138,7 +153,7 @@ public class AuthenticationProviderBasic implements
AuthenticationProvider {
incrementFailureMetric(errorCode);
throw exception;
}
- AuthenticationMetrics.authenticateSuccess(getClass().getSimpleName(),
getAuthMethodName());
+ authenticationMetrics.recordSuccess();
return userId;
}
diff --git
a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authentication/AuthenticationProviderList.java
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authentication/AuthenticationProviderList.java
index 211f2ea006b..0e5559b3c3a 100644
---
a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authentication/AuthenticationProviderList.java
+++
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authentication/AuthenticationProviderList.java
@@ -38,6 +38,8 @@ import org.apache.pulsar.common.api.AuthData;
@Slf4j
public class AuthenticationProviderList implements AuthenticationProvider {
+ private AuthenticationMetrics authenticationMetrics;
+
private interface AuthProcessor<T, W> {
T apply(W process) throws AuthenticationException;
@@ -49,7 +51,8 @@ public class AuthenticationProviderList implements
AuthenticationProvider {
AUTH_REQUIRED,
}
- static <T, W> T applyAuthProcessor(List<W> processors, AuthProcessor<T, W>
authFunc)
+ private static <T, W> T applyAuthProcessor(List<W> processors,
AuthenticationMetrics metrics,
+ AuthProcessor<T, W> authFunc)
throws AuthenticationException {
AuthenticationException authenticationException = null;
String errorCode = ErrorCode.UNKNOWN.name();
@@ -67,30 +70,29 @@ public class AuthenticationProviderList implements
AuthenticationProvider {
}
if (null == authenticationException) {
- AuthenticationMetrics.authenticateFailure(
- AuthenticationProviderList.class.getSimpleName(),
+
metrics.recordFailure(AuthenticationProviderList.class.getSimpleName(),
"authentication-provider-list", ErrorCode.AUTH_REQUIRED);
throw new AuthenticationException("Authentication required");
} else {
- AuthenticationMetrics.authenticateFailure(
- AuthenticationProviderList.class.getSimpleName(),
+
metrics.recordFailure(AuthenticationProviderList.class.getSimpleName(),
"authentication-provider-list", errorCode);
throw authenticationException;
}
-
}
private static class AuthenticationListState implements
AuthenticationState {
private final List<AuthenticationState> states;
private volatile AuthenticationState authState;
+ private final AuthenticationMetrics metrics;
- AuthenticationListState(List<AuthenticationState> states) {
+ AuthenticationListState(List<AuthenticationState> states,
AuthenticationMetrics metrics) {
if (states == null || states.isEmpty()) {
throw new IllegalArgumentException("Authentication state
requires at least one state");
}
this.states = states;
this.authState = states.get(0);
+ this.metrics = metrics;
}
private AuthenticationState getAuthState() throws
AuthenticationException {
@@ -135,8 +137,9 @@ public class AuthenticationProviderList implements
AuthenticationProvider {
if (previousException == null) {
previousException = new
AuthenticationException("Authentication required");
}
-
AuthenticationMetrics.authenticateFailure(AuthenticationProviderList.class.getSimpleName(),
- "authentication-provider-list",
ErrorCode.AUTH_REQUIRED);
+
metrics.recordFailure(AuthenticationProviderList.class.getSimpleName(),
+ "authentication-provider-list",
+ ErrorCode.AUTH_REQUIRED);
authChallengeFuture.completeExceptionally(previousException);
return;
}
@@ -166,6 +169,7 @@ public class AuthenticationProviderList implements
AuthenticationProvider {
public AuthData authenticate(AuthData authData) throws
AuthenticationException {
return applyAuthProcessor(
states,
+ metrics,
as -> {
AuthData ad = as.authenticate(authData);
AuthenticationListState.this.authState = as;
@@ -216,8 +220,15 @@ public class AuthenticationProviderList implements
AuthenticationProvider {
@Override
public void initialize(ServiceConfiguration config) throws IOException {
+ initialize(Context.builder().config(config).build());
+ }
+
+ @Override
+ public void initialize(Context context) throws IOException {
+ authenticationMetrics = new
AuthenticationMetrics(context.getOpenTelemetry(),
+ getClass().getSimpleName(), getAuthMethodName());
for (AuthenticationProvider ap : providers) {
- ap.initialize(config);
+ ap.initialize(context);
}
}
@@ -226,6 +237,11 @@ public class AuthenticationProviderList implements
AuthenticationProvider {
return providers.get(0).getAuthMethodName();
}
+ @Override
+ public void incrementFailureMetric(Enum<?> errorCode) {
+ authenticationMetrics.recordFailure(errorCode);
+ }
+
@Override
public CompletableFuture<String>
authenticateAsync(AuthenticationDataSource authData) {
CompletableFuture<String> roleFuture = new CompletableFuture<>();
@@ -241,7 +257,7 @@ public class AuthenticationProviderList implements
AuthenticationProvider {
if (previousException == null) {
previousException = new
AuthenticationException("Authentication required");
}
-
AuthenticationMetrics.authenticateFailure(AuthenticationProviderList.class.getSimpleName(),
+
authenticationMetrics.recordFailure(AuthenticationProvider.class.getSimpleName(),
"authentication-provider-list", ErrorCode.AUTH_REQUIRED);
roleFuture.completeExceptionally(previousException);
return;
@@ -264,6 +280,7 @@ public class AuthenticationProviderList implements
AuthenticationProvider {
public String authenticate(AuthenticationDataSource authData) throws
AuthenticationException {
return applyAuthProcessor(
providers,
+ authenticationMetrics,
provider -> provider.authenticate(authData)
);
}
@@ -294,7 +311,7 @@ public class AuthenticationProviderList implements
AuthenticationProvider {
throw new AuthenticationException("Failed to initialize a new
auth state from " + remoteAddress);
}
} else {
- return new AuthenticationListState(states);
+ return new AuthenticationListState(states, authenticationMetrics);
}
}
@@ -325,7 +342,7 @@ public class AuthenticationProviderList implements
AuthenticationProvider {
"Failed to initialize a new http auth state from " +
request.getRemoteHost());
}
} else {
- return new AuthenticationListState(states);
+ return new AuthenticationListState(states, authenticationMetrics);
}
}
@@ -333,6 +350,7 @@ public class AuthenticationProviderList implements
AuthenticationProvider {
public boolean authenticateHttpRequest(HttpServletRequest request,
HttpServletResponse response) throws Exception {
Boolean authenticated = applyAuthProcessor(
providers,
+ authenticationMetrics,
provider -> {
try {
return provider.authenticateHttpRequest(request, response);
diff --git
a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authentication/AuthenticationProviderTls.java
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authentication/AuthenticationProviderTls.java
index a4c44121b4b..f7ff47fe8e6 100644
---
a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authentication/AuthenticationProviderTls.java
+++
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authentication/AuthenticationProviderTls.java
@@ -27,6 +27,8 @@ import
org.apache.pulsar.broker.authentication.metrics.AuthenticationMetrics;
public class AuthenticationProviderTls implements AuthenticationProvider {
+ private AuthenticationMetrics authenticationMetrics;
+
private enum ErrorCode {
UNKNOWN,
INVALID_CERTS,
@@ -40,7 +42,13 @@ public class AuthenticationProviderTls implements
AuthenticationProvider {
@Override
public void initialize(ServiceConfiguration config) throws IOException {
- // noop
+ initialize(Context.builder().config(config).build());
+ }
+
+ @Override
+ public void initialize(Context context) throws IOException {
+ authenticationMetrics = new
AuthenticationMetrics(context.getOpenTelemetry(),
+ getClass().getSimpleName(), getAuthMethodName());
}
@Override
@@ -48,6 +56,11 @@ public class AuthenticationProviderTls implements
AuthenticationProvider {
return "tls";
}
+ @Override
+ public void incrementFailureMetric(Enum<?> errorCode) {
+ authenticationMetrics.recordFailure(errorCode);
+ }
+
@Override
public String authenticate(AuthenticationDataSource authData) throws
AuthenticationException {
String commonName = null;
@@ -96,7 +109,7 @@ public class AuthenticationProviderTls implements
AuthenticationProvider {
errorCode = ErrorCode.INVALID_CN;
throw new AuthenticationException("Client unable to
authenticate with TLS certificate");
}
-
AuthenticationMetrics.authenticateSuccess(getClass().getSimpleName(),
getAuthMethodName());
+ authenticationMetrics.recordSuccess();
} catch (AuthenticationException exception) {
incrementFailureMetric(errorCode);
throw exception;
diff --git
a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authentication/AuthenticationProviderToken.java
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authentication/AuthenticationProviderToken.java
index f8992b21ff4..74bc85ad3ff 100644
---
a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authentication/AuthenticationProviderToken.java
+++
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authentication/AuthenticationProviderToken.java
@@ -21,7 +21,6 @@ package org.apache.pulsar.broker.authentication;
import static java.nio.charset.StandardCharsets.UTF_8;
import static
org.apache.pulsar.broker.web.AuthenticationFilter.AuthenticatedDataAttributeName;
import static
org.apache.pulsar.broker.web.AuthenticationFilter.AuthenticatedRoleAttributeName;
-import com.google.common.annotations.VisibleForTesting;
import io.jsonwebtoken.Claims;
import io.jsonwebtoken.ExpiredJwtException;
import io.jsonwebtoken.Jwt;
@@ -31,8 +30,6 @@ import io.jsonwebtoken.Jwts;
import io.jsonwebtoken.RequiredTypeException;
import io.jsonwebtoken.SignatureAlgorithm;
import io.jsonwebtoken.security.SignatureException;
-import io.prometheus.client.Counter;
-import io.prometheus.client.Histogram;
import java.io.IOException;
import java.net.SocketAddress;
import java.security.Key;
@@ -44,7 +41,7 @@ import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import org.apache.commons.lang3.StringUtils;
import org.apache.pulsar.broker.ServiceConfiguration;
-import org.apache.pulsar.broker.authentication.metrics.AuthenticationMetrics;
+import
org.apache.pulsar.broker.authentication.metrics.AuthenticationMetricsToken;
import org.apache.pulsar.broker.authentication.utils.AuthTokenUtils;
import org.apache.pulsar.common.api.AuthData;
@@ -79,17 +76,6 @@ public class AuthenticationProviderToken implements
AuthenticationProvider {
static final String TOKEN = "token";
- private static final Counter expiredTokenMetrics = Counter.build()
- .name("pulsar_expired_token_total")
- .help("Pulsar expired token")
- .register();
-
- private static final Histogram expiringTokenMinutesMetrics =
Histogram.build()
- .name("pulsar_expiring_token_minutes")
- .help("The remaining time of expiring token in minutes")
- .buckets(5, 10, 60, 240)
- .register();
-
private Key validationKey;
private String roleClaim;
private SignatureAlgorithm publicKeyAlg;
@@ -106,6 +92,8 @@ public class AuthenticationProviderToken implements
AuthenticationProvider {
private String confTokenAudienceSettingName;
private String confTokenAllowedClockSkewSecondsSettingName;
+ private AuthenticationMetricsToken authenticationMetricsToken;
+
public enum ErrorCode {
INVALID_AUTH_DATA,
INVALID_TOKEN,
@@ -117,14 +105,17 @@ public class AuthenticationProviderToken implements
AuthenticationProvider {
// noop
}
- @VisibleForTesting
- public static void resetMetrics() {
- expiredTokenMetrics.clear();
- expiringTokenMinutesMetrics.clear();
+ @Override
+ public void initialize(ServiceConfiguration config) throws IOException {
+ initialize(Context.builder().config(config).build());
}
@Override
- public void initialize(ServiceConfiguration config) throws IOException,
IllegalArgumentException {
+ public void initialize(Context context) throws IOException {
+ authenticationMetricsToken = new
AuthenticationMetricsToken(context.getOpenTelemetry(),
+ getClass().getSimpleName(), getAuthMethodName());
+
+ var config = context.getConfig();
String prefix = (String) config.getProperty(CONF_TOKEN_SETTING_PREFIX);
if (null == prefix) {
prefix = "";
@@ -162,6 +153,11 @@ public class AuthenticationProviderToken implements
AuthenticationProvider {
return TOKEN;
}
+ @Override
+ public void incrementFailureMetric(Enum<?> errorCode) {
+ authenticationMetricsToken.recordFailure(errorCode);
+ }
+
@Override
public String authenticate(AuthenticationDataSource authData) throws
AuthenticationException {
String token;
@@ -174,7 +170,7 @@ public class AuthenticationProviderToken implements
AuthenticationProvider {
}
// Parse Token by validating
String role = getPrincipal(authenticateToken(token));
- AuthenticationMetrics.authenticateSuccess(getClass().getSimpleName(),
getAuthMethodName());
+ authenticationMetricsToken.recordSuccess();
return role;
}
@@ -263,14 +259,13 @@ public class AuthenticationProviderToken implements
AuthenticationProvider {
}
}
- if (jwt.getBody().getExpiration() != null) {
- expiringTokenMinutesMetrics.observe(
- (double) (jwt.getBody().getExpiration().getTime() -
new Date().getTime()) / (60 * 1000));
- }
+ var expiration = jwt.getBody().getExpiration();
+ var tokenRemainingDurationMs = expiration != null ?
expiration.getTime() - new Date().getTime() : null;
+
authenticationMetricsToken.recordTokenDuration(tokenRemainingDurationMs);
return jwt;
} catch (JwtException e) {
if (e instanceof ExpiredJwtException) {
- expiredTokenMetrics.inc();
+ authenticationMetricsToken.recordTokenExpired();
}
incrementFailureMetric(ErrorCode.INVALID_TOKEN);
throw new AuthenticationException("Failed to authentication token:
" + e.getMessage());
diff --git
a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authentication/AuthenticationService.java
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authentication/AuthenticationService.java
index 22296b86b4e..f6eb785d2e4 100644
---
a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authentication/AuthenticationService.java
+++
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authentication/AuthenticationService.java
@@ -20,6 +20,7 @@ package org.apache.pulsar.broker.authentication;
import static
org.apache.pulsar.broker.web.AuthenticationFilter.AuthenticatedDataAttributeName;
import static
org.apache.pulsar.broker.web.AuthenticationFilter.AuthenticatedRoleAttributeName;
+import io.opentelemetry.api.OpenTelemetry;
import java.io.Closeable;
import java.io.IOException;
import java.util.ArrayList;
@@ -51,6 +52,11 @@ public class AuthenticationService implements Closeable {
private final Map<String, AuthenticationProvider> providers = new
HashMap<>();
public AuthenticationService(ServiceConfiguration conf) throws
PulsarServerException {
+ this(conf, OpenTelemetry.noop());
+ }
+
+ public AuthenticationService(ServiceConfiguration conf, OpenTelemetry
openTelemetry)
+ throws PulsarServerException {
anonymousUserRole = conf.getAnonymousUserRole();
if (conf.isAuthenticationEnabled()) {
try {
@@ -70,6 +76,10 @@ public class AuthenticationService implements Closeable {
providerList.add(provider);
}
+ var authenticationProviderContext =
AuthenticationProvider.Context.builder()
+ .config(conf)
+ .openTelemetry(openTelemetry)
+ .build();
for (Map.Entry<String, List<AuthenticationProvider>> entry :
providerMap.entrySet()) {
AuthenticationProvider provider;
if (entry.getValue().size() == 1) {
@@ -77,7 +87,7 @@ public class AuthenticationService implements Closeable {
} else {
provider = new
AuthenticationProviderList(entry.getValue());
}
- provider.initialize(conf);
+ provider.initialize(authenticationProviderContext);
providers.put(provider.getAuthMethodName(), provider);
LOG.info("[{}] has been loaded.",
entry.getValue().stream().map(
diff --git
a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authentication/metrics/AuthenticationMetrics.java
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authentication/metrics/AuthenticationMetrics.java
index 5faaccbe157..931ad50e117 100644
---
a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authentication/metrics/AuthenticationMetrics.java
+++
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authentication/metrics/AuthenticationMetrics.java
@@ -18,28 +18,27 @@
*/
package org.apache.pulsar.broker.authentication.metrics;
+import io.opentelemetry.api.OpenTelemetry;
+import io.opentelemetry.api.common.AttributeKey;
+import io.opentelemetry.api.common.Attributes;
+import io.opentelemetry.api.metrics.LongCounter;
import io.prometheus.client.Counter;
public class AuthenticationMetrics {
+ @Deprecated
private static final Counter authSuccessMetrics = Counter.build()
.name("pulsar_authentication_success_total")
.help("Pulsar authentication success")
.labelNames("provider_name", "auth_method")
.register();
+ @Deprecated
private static final Counter authFailuresMetrics = Counter.build()
.name("pulsar_authentication_failures_total")
.help("Pulsar authentication failures")
.labelNames("provider_name", "auth_method", "reason")
.register();
- /**
- * Log authenticate success event to the authentication metrics.
- * @param providerName The short class name of the provider
- * @param authMethod Authentication method name
- */
- public static void authenticateSuccess(String providerName, String
authMethod) {
- authSuccessMetrics.labels(providerName, authMethod).inc();
- }
+ public static final String INSTRUMENTATION_SCOPE_NAME =
"org.apache.pulsar.authentication";
/**
* Log authenticate failure event to the authentication metrics.
@@ -62,8 +61,58 @@ public class AuthenticationMetrics {
* @param authMethod Authentication method name.
* @param errorCode Error code.
*/
+ @Deprecated
public static void authenticateFailure(String providerName, String
authMethod, Enum<?> errorCode) {
authFailuresMetrics.labels(providerName, authMethod,
errorCode.name()).inc();
}
+ public static final String AUTHENTICATION_COUNTER_METRIC_NAME =
"pulsar.authentication.operation.count";
+ private final LongCounter authenticationCounter;
+
+ public static final AttributeKey<String> PROVIDER_KEY =
AttributeKey.stringKey("pulsar.authentication.provider");
+ public static final AttributeKey<String> AUTH_METHOD_KEY =
AttributeKey.stringKey("pulsar.authentication.method");
+ public static final AttributeKey<String> ERROR_CODE_KEY =
AttributeKey.stringKey("pulsar.authentication.error");
+ public static final AttributeKey<String> AUTH_RESULT_KEY =
AttributeKey.stringKey("pulsar.authentication.result");
+ public enum AuthenticationResult {
+ SUCCESS,
+ FAILURE;
+ }
+
+ private final String providerName;
+ private final String authMethod;
+
+ public AuthenticationMetrics(OpenTelemetry openTelemetry, String
providerName, String authMethod) {
+ this.providerName = providerName;
+ this.authMethod = authMethod;
+ var meter = openTelemetry.getMeter(INSTRUMENTATION_SCOPE_NAME);
+ authenticationCounter =
meter.counterBuilder(AUTHENTICATION_COUNTER_METRIC_NAME)
+ .setDescription("The number of authentication operations")
+ .setUnit("{operation}")
+ .build();
+ }
+
+ public void recordSuccess() {
+ authSuccessMetrics.labels(providerName, authMethod).inc();
+ var attributes = Attributes.of(PROVIDER_KEY, providerName,
+ AUTH_METHOD_KEY, authMethod,
+ AUTH_RESULT_KEY,
AuthenticationResult.SUCCESS.name().toLowerCase());
+ authenticationCounter.add(1, attributes);
+ }
+
+ public void recordFailure(Enum<?> errorCode) {
+ recordFailure(providerName, authMethod, errorCode.name());
+ }
+
+ public void recordFailure(String providerName, String authMethod, Enum<?>
errorCode) {
+ recordFailure(providerName, authMethod, errorCode.name());
+ }
+
+ public void recordFailure(String providerName, String authMethod, String
errorCode) {
+ authenticateFailure(providerName, authMethod, errorCode);
+ var attributes = Attributes.of(PROVIDER_KEY, providerName,
+ AUTH_METHOD_KEY, authMethod,
+ AUTH_RESULT_KEY,
AuthenticationResult.FAILURE.name().toLowerCase(),
+ ERROR_CODE_KEY, errorCode);
+ authenticationCounter.add(1, attributes);
+ }
}
diff --git
a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authentication/metrics/AuthenticationMetricsToken.java
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authentication/metrics/AuthenticationMetricsToken.java
new file mode 100644
index 00000000000..4e9d1d6b16a
--- /dev/null
+++
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authentication/metrics/AuthenticationMetricsToken.java
@@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.authentication.metrics;
+
+import com.google.common.annotations.VisibleForTesting;
+import io.opentelemetry.api.OpenTelemetry;
+import io.opentelemetry.api.metrics.DoubleHistogram;
+import io.opentelemetry.api.metrics.LongCounter;
+import io.prometheus.client.Counter;
+import io.prometheus.client.Histogram;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+import org.apache.pulsar.common.stats.MetricsUtil;
+
+public class AuthenticationMetricsToken extends AuthenticationMetrics {
+
+ @Deprecated
+ private static final Counter expiredTokenMetrics = Counter.build()
+ .name("pulsar_expired_token_total")
+ .help("Pulsar expired token")
+ .register();
+ public static final String EXPIRED_TOKEN_COUNTER_METRIC_NAME =
"pulsar.authentication.token.expired.count";
+ private LongCounter expiredTokensCounter;
+
+ private static final List<Long> TOKEN_DURATION_BUCKET_BOUNDARIES_SECONDS =
List.of(
+ TimeUnit.MINUTES.toSeconds(5),
+ TimeUnit.MINUTES.toSeconds(10),
+ TimeUnit.HOURS.toSeconds(1),
+ TimeUnit.HOURS.toSeconds(4),
+ TimeUnit.DAYS.toSeconds(1),
+ TimeUnit.DAYS.toSeconds(7),
+ TimeUnit.DAYS.toSeconds(14),
+ TimeUnit.DAYS.toSeconds(30),
+ TimeUnit.DAYS.toSeconds(90),
+ TimeUnit.DAYS.toSeconds(180),
+ TimeUnit.DAYS.toSeconds(270),
+ TimeUnit.DAYS.toSeconds(365));
+
+ @Deprecated
+ private static final Histogram expiringTokenMinutesMetrics =
Histogram.build()
+ .name("pulsar_expiring_token_minutes")
+ .help("The remaining time of expiring token in minutes")
+ .buckets(TOKEN_DURATION_BUCKET_BOUNDARIES_SECONDS.stream()
+ .map(TimeUnit.SECONDS::toMinutes)
+ .mapToDouble(Double::valueOf)
+ .toArray())
+ .register();
+ public static final String EXPIRING_TOKEN_HISTOGRAM_METRIC_NAME =
"pulsar.authentication.token.expiry.duration";
+ private DoubleHistogram expiringTokenSeconds;
+
+ public AuthenticationMetricsToken(OpenTelemetry openTelemetry, String
providerName,
+ String authMethod) {
+ super(openTelemetry, providerName, authMethod);
+
+ var meter =
openTelemetry.getMeter(AuthenticationMetrics.INSTRUMENTATION_SCOPE_NAME);
+ expiredTokensCounter =
meter.counterBuilder(EXPIRED_TOKEN_COUNTER_METRIC_NAME)
+ .setDescription("The total number of expired tokens")
+ .setUnit("{token}")
+ .build();
+ expiringTokenSeconds =
meter.histogramBuilder(EXPIRING_TOKEN_HISTOGRAM_METRIC_NAME)
+ .setExplicitBucketBoundariesAdvice(
+
TOKEN_DURATION_BUCKET_BOUNDARIES_SECONDS.stream().map(Double::valueOf).toList())
+ .setDescription("The remaining time of expiring token in
seconds")
+ .setUnit("s")
+ .build();
+ }
+
+ public void recordTokenDuration(Long durationMs) {
+ if (durationMs == null) {
+ // Special case signals a token without expiry. OpenTelemetry
supports reporting infinite values.
+ expiringTokenSeconds.record(Double.POSITIVE_INFINITY);
+ } else if (durationMs > 0) {
+ expiringTokenMinutesMetrics.observe(durationMs / 60_000.0d);
+
expiringTokenSeconds.record(MetricsUtil.convertToSeconds(durationMs,
TimeUnit.MILLISECONDS));
+ } else {
+ // Duration can be negative if token expires at processing time.
OpenTelemetry does not support negative
+ // values, so record token expiry instead.
+ recordTokenExpired();
+ }
+ }
+
+ public void recordTokenExpired() {
+ expiredTokenMetrics.inc();
+ expiredTokensCounter.add(1);
+ }
+
+ @VisibleForTesting
+ @Deprecated
+ public static void reset() {
+ expiredTokenMetrics.clear();
+ expiringTokenMinutesMetrics.clear();
+ }
+}
diff --git
a/pulsar-broker-common/src/test/java/org/apache/pulsar/broker/authentication/AuthenticationProviderBasicTest.java
b/pulsar-broker-common/src/test/java/org/apache/pulsar/broker/authentication/AuthenticationProviderBasicTest.java
index 723fde7083d..f6e4b8e969a 100644
---
a/pulsar-broker-common/src/test/java/org/apache/pulsar/broker/authentication/AuthenticationProviderBasicTest.java
+++
b/pulsar-broker-common/src/test/java/org/apache/pulsar/broker/authentication/AuthenticationProviderBasicTest.java
@@ -52,7 +52,7 @@ public class AuthenticationProviderBasicTest {
Properties properties = new Properties();
properties.setProperty("basicAuthConf", basicAuthConf);
serviceConfiguration.setProperties(properties);
- provider.initialize(serviceConfiguration);
+
provider.initialize(AuthenticationProvider.Context.builder().config(serviceConfiguration).build());
testAuthenticate(provider);
}
@@ -64,7 +64,7 @@ public class AuthenticationProviderBasicTest {
Properties properties = new Properties();
properties.setProperty("basicAuthConf", basicAuthConfBase64);
serviceConfiguration.setProperties(properties);
- provider.initialize(serviceConfiguration);
+
provider.initialize(AuthenticationProvider.Context.builder().config(serviceConfiguration).build());
testAuthenticate(provider);
}
@@ -74,7 +74,7 @@ public class AuthenticationProviderBasicTest {
AuthenticationProviderBasic provider = new
AuthenticationProviderBasic();
ServiceConfiguration serviceConfiguration = new ServiceConfiguration();
System.setProperty("pulsar.auth.basic.conf", basicAuthConf);
- provider.initialize(serviceConfiguration);
+
provider.initialize(AuthenticationProvider.Context.builder().config(serviceConfiguration).build());
testAuthenticate(provider);
}
@@ -84,7 +84,7 @@ public class AuthenticationProviderBasicTest {
AuthenticationProviderBasic provider = new
AuthenticationProviderBasic();
ServiceConfiguration serviceConfiguration = new ServiceConfiguration();
System.setProperty("pulsar.auth.basic.conf", basicAuthConfBase64);
- provider.initialize(serviceConfiguration);
+
provider.initialize(AuthenticationProvider.Context.builder().config(serviceConfiguration).build());
testAuthenticate(provider);
}
diff --git
a/pulsar-broker-common/src/test/java/org/apache/pulsar/broker/authentication/AuthenticationProviderListTest.java
b/pulsar-broker-common/src/test/java/org/apache/pulsar/broker/authentication/AuthenticationProviderListTest.java
index 7793a5c029f..e81198217b5 100644
---
a/pulsar-broker-common/src/test/java/org/apache/pulsar/broker/authentication/AuthenticationProviderListTest.java
+++
b/pulsar-broker-common/src/test/java/org/apache/pulsar/broker/authentication/AuthenticationProviderListTest.java
@@ -29,7 +29,6 @@ import static org.mockito.Mockito.when;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertFalse;
import static org.testng.Assert.assertTrue;
-
import io.jsonwebtoken.SignatureAlgorithm;
import io.jsonwebtoken.io.Decoders;
import io.jsonwebtoken.security.Keys;
@@ -90,7 +89,7 @@ public class AuthenticationProviderListTest {
);
ServiceConfiguration confA = new ServiceConfiguration();
confA.setProperties(propertiesA);
- providerA.initialize(confA);
+
providerA.initialize(AuthenticationProvider.Context.builder().config(confA).build());
Properties propertiesB = new Properties();
propertiesB.setProperty(AuthenticationProviderToken.CONF_TOKEN_SETTING_PREFIX,
"b");
@@ -103,7 +102,7 @@ public class AuthenticationProviderListTest {
);
ServiceConfiguration confB = new ServiceConfiguration();
confB.setProperties(propertiesB);
- providerB.initialize(confB);
+
providerB.initialize(AuthenticationProvider.Context.builder().config(confB).build());
this.authProvider = new AuthenticationProviderList(Lists.newArrayList(
providerA, providerB
diff --git
a/pulsar-broker-common/src/test/java/org/apache/pulsar/broker/authentication/AuthenticationProviderTokenTest.java
b/pulsar-broker-common/src/test/java/org/apache/pulsar/broker/authentication/AuthenticationProviderTokenTest.java
index f50731c7654..3e1a3e18034 100644
---
a/pulsar-broker-common/src/test/java/org/apache/pulsar/broker/authentication/AuthenticationProviderTokenTest.java
+++
b/pulsar-broker-common/src/test/java/org/apache/pulsar/broker/authentication/AuthenticationProviderTokenTest.java
@@ -55,7 +55,9 @@ import javax.crypto.SecretKey;
import javax.naming.AuthenticationException;
import javax.servlet.http.HttpServletRequest;
import lombok.Cleanup;
+import org.apache.commons.lang3.reflect.FieldUtils;
import org.apache.pulsar.broker.ServiceConfiguration;
+import
org.apache.pulsar.broker.authentication.metrics.AuthenticationMetricsToken;
import org.apache.pulsar.broker.authentication.utils.AuthTokenUtils;
import org.apache.pulsar.common.api.AuthData;
import org.mockito.Mockito;
@@ -70,7 +72,7 @@ public class AuthenticationProviderTokenTest {
AuthenticationProviderToken provider = new
AuthenticationProviderToken();
try {
- provider.initialize(new ServiceConfiguration());
+
provider.initialize(AuthenticationProvider.Context.builder().config(new
ServiceConfiguration()).build());
fail("should have failed");
} catch (IOException e) {
// Expected, secret key was not defined
@@ -135,7 +137,7 @@ public class AuthenticationProviderTokenTest {
ServiceConfiguration conf = new ServiceConfiguration();
conf.setProperties(properties);
- provider.initialize(conf);
+
provider.initialize(AuthenticationProvider.Context.builder().config(conf).build());
try {
provider.authenticate(new AuthenticationDataSource() {
@@ -249,7 +251,7 @@ public class AuthenticationProviderTokenTest {
ServiceConfiguration conf = new ServiceConfiguration();
conf.setProperties(properties);
- provider.initialize(conf);
+
provider.initialize(AuthenticationProvider.Context.builder().config(conf).build());
}
@Test
@@ -268,7 +270,7 @@ public class AuthenticationProviderTokenTest {
ServiceConfiguration conf = new ServiceConfiguration();
conf.setProperties(properties);
- provider.initialize(conf);
+
provider.initialize(AuthenticationProvider.Context.builder().config(conf).build());
String token = AuthTokenUtils.createToken(secretKey, SUBJECT,
Optional.empty());
@@ -303,7 +305,7 @@ public class AuthenticationProviderTokenTest {
ServiceConfiguration conf = new ServiceConfiguration();
conf.setProperties(properties);
- provider.initialize(conf);
+
provider.initialize(AuthenticationProvider.Context.builder().config(conf).build());
String token = AuthTokenUtils.createToken(secretKey, SUBJECT,
Optional.empty());
@@ -335,7 +337,7 @@ public class AuthenticationProviderTokenTest {
ServiceConfiguration conf = new ServiceConfiguration();
conf.setProperties(properties);
- provider.initialize(conf);
+
provider.initialize(AuthenticationProvider.Context.builder().config(conf).build());
String token = AuthTokenUtils.createToken(secretKey, SUBJECT,
Optional.empty());
@@ -370,7 +372,7 @@ public class AuthenticationProviderTokenTest {
ServiceConfiguration conf = new ServiceConfiguration();
conf.setProperties(properties);
- provider.initialize(conf);
+
provider.initialize(AuthenticationProvider.Context.builder().config(conf).build());
// Use private key to generate token
PrivateKey privateKey =
AuthTokenUtils.decodePrivateKey(Decoders.BASE64.decode(privateKeyStr),
SignatureAlgorithm.RS256);
@@ -413,8 +415,7 @@ public class AuthenticationProviderTokenTest {
ServiceConfiguration conf = new ServiceConfiguration();
conf.setProperties(properties);
- provider.initialize(conf);
-
+
provider.initialize(AuthenticationProvider.Context.builder().config(conf).build());
// Use private key to generate token
PrivateKey privateKey =
AuthTokenUtils.decodePrivateKey(Decoders.BASE64.decode(privateKeyStr),
SignatureAlgorithm.RS256);
@@ -460,7 +461,7 @@ public class AuthenticationProviderTokenTest {
ServiceConfiguration conf = new ServiceConfiguration();
conf.setProperties(properties);
- provider.initialize(conf);
+
provider.initialize(AuthenticationProvider.Context.builder().config(conf).build());
// Use private key to generate token
PrivateKey privateKey =
AuthTokenUtils.decodePrivateKey(Decoders.BASE64.decode(privateKeyStr),
SignatureAlgorithm.ES256);
@@ -484,8 +485,11 @@ public class AuthenticationProviderTokenTest {
}
@Test(expectedExceptions = AuthenticationException.class)
- public void testAuthenticateWhenNoJwtPassed() throws
AuthenticationException {
+ public void testAuthenticateWhenNoJwtPassed() throws Exception {
+ @Cleanup
AuthenticationProviderToken provider = new
AuthenticationProviderToken();
+ FieldUtils.writeDeclaredField(
+ provider, "authenticationMetricsToken",
mock(AuthenticationMetricsToken.class), true);
provider.authenticate(new AuthenticationDataSource() {
@Override
public boolean hasDataFromCommand() {
@@ -500,8 +504,11 @@ public class AuthenticationProviderTokenTest {
}
@Test(expectedExceptions = AuthenticationException.class)
- public void testAuthenticateWhenAuthorizationHeaderNotExist() throws
AuthenticationException {
+ public void testAuthenticateWhenAuthorizationHeaderNotExist() throws
Exception {
+ @Cleanup
AuthenticationProviderToken provider = new
AuthenticationProviderToken();
+ FieldUtils.writeDeclaredField(
+ provider, "authenticationMetricsToken",
mock(AuthenticationMetricsToken.class), true);
provider.authenticate(new AuthenticationDataSource() {
@Override
public String getHttpHeader(String name) {
@@ -516,8 +523,11 @@ public class AuthenticationProviderTokenTest {
}
@Test(expectedExceptions = AuthenticationException.class)
- public void testAuthenticateWhenAuthHeaderValuePrefixIsInvalid() throws
AuthenticationException {
+ public void testAuthenticateWhenAuthHeaderValuePrefixIsInvalid() throws
Exception {
+ @Cleanup
AuthenticationProviderToken provider = new
AuthenticationProviderToken();
+ FieldUtils.writeDeclaredField(
+ provider, "authenticationMetricsToken",
mock(AuthenticationMetricsToken.class), true);
provider.authenticate(new AuthenticationDataSource() {
@Override
public String getHttpHeader(String name) {
@@ -532,8 +542,11 @@ public class AuthenticationProviderTokenTest {
}
@Test(expectedExceptions = AuthenticationException.class)
- public void testAuthenticateWhenJwtIsBlank() throws
AuthenticationException {
+ public void testAuthenticateWhenJwtIsBlank() throws Exception {
+ @Cleanup
AuthenticationProviderToken provider = new
AuthenticationProviderToken();
+ FieldUtils.writeDeclaredField(
+ provider, "authenticationMetricsToken",
mock(AuthenticationMetricsToken.class), true);
provider.authenticate(new AuthenticationDataSource() {
@Override
public String getHttpHeader(String name) {
@@ -559,7 +572,7 @@ public class AuthenticationProviderTokenTest {
conf.setProperties(properties);
AuthenticationProviderToken provider = new
AuthenticationProviderToken();
- provider.initialize(conf);
+
provider.initialize(AuthenticationProvider.Context.builder().config(conf).build());
provider.authenticate(new AuthenticationDataSource() {
@Override
public String getHttpHeader(String name) {
@@ -582,7 +595,7 @@ public class AuthenticationProviderTokenTest {
conf.setProperties(properties);
AuthenticationProviderToken provider = new
AuthenticationProviderToken();
- provider.initialize(conf);
+
provider.initialize(AuthenticationProvider.Context.builder().config(conf).build());
}
@Test(expectedExceptions = IOException.class)
@@ -594,7 +607,7 @@ public class AuthenticationProviderTokenTest {
conf.setProperties(properties);
AuthenticationProviderToken provider = new
AuthenticationProviderToken();
- provider.initialize(conf);
+
provider.initialize(AuthenticationProvider.Context.builder().config(conf).build());
}
@Test(expectedExceptions = IOException.class)
@@ -606,7 +619,7 @@ public class AuthenticationProviderTokenTest {
ServiceConfiguration conf = new ServiceConfiguration();
conf.setProperties(properties);
- new AuthenticationProviderToken().initialize(conf);
+ new
AuthenticationProviderToken().initialize(AuthenticationProvider.Context.builder().config(conf).build());
}
@Test(expectedExceptions = IOException.class)
@@ -618,7 +631,7 @@ public class AuthenticationProviderTokenTest {
ServiceConfiguration conf = new ServiceConfiguration();
conf.setProperties(properties);
- new AuthenticationProviderToken().initialize(conf);
+ new
AuthenticationProviderToken().initialize(AuthenticationProvider.Context.builder().config(conf).build());
}
@Test(expectedExceptions = IllegalArgumentException.class)
@@ -633,7 +646,7 @@ public class AuthenticationProviderTokenTest {
ServiceConfiguration conf = new ServiceConfiguration();
conf.setProperties(properties);
- new AuthenticationProviderToken().initialize(conf);
+ new
AuthenticationProviderToken().initialize(AuthenticationProvider.Context.builder().config(conf).build());
}
@Test(expectedExceptions = IOException.class)
@@ -645,7 +658,7 @@ public class AuthenticationProviderTokenTest {
ServiceConfiguration conf = new ServiceConfiguration();
conf.setProperties(properties);
- new AuthenticationProviderToken().initialize(conf);
+ new
AuthenticationProviderToken().initialize(AuthenticationProvider.Context.builder().config(conf).build());
}
@Test(expectedExceptions = IllegalArgumentException.class)
@@ -657,7 +670,7 @@ public class AuthenticationProviderTokenTest {
ServiceConfiguration conf = new ServiceConfiguration();
conf.setProperties(properties);
- new AuthenticationProviderToken().initialize(conf);
+ new
AuthenticationProviderToken().initialize(AuthenticationProvider.Context.builder().config(conf).build());
}
@@ -676,7 +689,7 @@ public class AuthenticationProviderTokenTest {
ServiceConfiguration conf = new ServiceConfiguration();
conf.setProperties(properties);
- provider.initialize(conf);
+
provider.initialize(AuthenticationProvider.Context.builder().config(conf).build());
// Create a token that will expire in 3 seconds
String expiringToken = AuthTokenUtils.createToken(secretKey, SUBJECT,
@@ -709,7 +722,7 @@ public class AuthenticationProviderTokenTest {
ServiceConfiguration conf = new ServiceConfiguration();
conf.setProperties(properties);
- provider.initialize(conf);
+
provider.initialize(AuthenticationProvider.Context.builder().config(conf).build());
// Create a token that is already expired
String expiringToken = AuthTokenUtils.createToken(secretKey, SUBJECT,
@@ -828,7 +841,7 @@ public class AuthenticationProviderTokenTest {
ServiceConfiguration conf = new ServiceConfiguration();
conf.setProperties(properties);
- provider.initialize(conf);
+
provider.initialize(AuthenticationProvider.Context.builder().config(conf).build());
// Use private key to generate token
PrivateKey privateKey =
AuthTokenUtils.decodePrivateKey(Decoders.BASE64.decode(privateKeyStr),
SignatureAlgorithm.RS256);
@@ -877,7 +890,7 @@ public class AuthenticationProviderTokenTest {
);
Mockito.when(mockConf.getProperty(AuthenticationProviderToken.CONF_TOKEN_SETTING_PREFIX)).thenReturn(prefix);
- provider.initialize(mockConf);
+
provider.initialize(AuthenticationProvider.Context.builder().config(mockConf).build());
// Each property is fetched only once. Prevent multiple fetches.
Mockito.verify(mockConf,
Mockito.times(1)).getProperty(AuthenticationProviderToken.CONF_TOKEN_SETTING_PREFIX);
@@ -908,7 +921,7 @@ public class AuthenticationProviderTokenTest {
ServiceConfiguration conf = new ServiceConfiguration();
conf.setProperties(properties);
- provider.initialize(conf);
+
provider.initialize(AuthenticationProvider.Context.builder().config(conf).build());
String token = AuthTokenUtils.createToken(secretKey, SUBJECT,
Optional.empty());
HttpServletRequest servletRequest = mock(HttpServletRequest.class);
@@ -934,7 +947,7 @@ public class AuthenticationProviderTokenTest {
ServiceConfiguration conf = new ServiceConfiguration();
conf.setProperties(properties);
- provider.initialize(conf);
+
provider.initialize(AuthenticationProvider.Context.builder().config(conf).build());
String token = AuthTokenUtils.createToken(secretKey, SUBJECT,
Optional.empty());
HttpServletRequest servletRequest = mock(HttpServletRequest.class);
@@ -960,7 +973,7 @@ public class AuthenticationProviderTokenTest {
ServiceConfiguration conf = new ServiceConfiguration();
conf.setProperties(properties);
- provider.initialize(conf);
+
provider.initialize(AuthenticationProvider.Context.builder().config(conf).build());
AuthenticationState authState = provider.newAuthState(null,null, null);
@@ -1016,7 +1029,7 @@ public class AuthenticationProviderTokenTest {
properties.setProperty(AuthenticationProviderToken.CONF_TOKEN_SECRET_KEY,
secretKeyFile.toString());
ServiceConfiguration conf = new ServiceConfiguration();
conf.setProperties(properties);
- provider.initialize(conf);
+
provider.initialize(AuthenticationProvider.Context.builder().config(conf).build());
String token = createTokenWithAudience(secretKey, audienceClaim,
audiences);
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
index e5248d45d42..c0e3e7d356b 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
@@ -398,7 +398,8 @@ public class BrokerService implements Closeable {
.name("pulsar-backlog-quota-checker")
.numThreads(1)
.build();
- this.authenticationService = new
AuthenticationService(pulsar.getConfiguration());
+ this.authenticationService = new
AuthenticationService(pulsar.getConfiguration(),
+ pulsar.getOpenTelemetry().getOpenTelemetry());
this.blockedDispatchers =
ConcurrentOpenHashSet.<PersistentDispatcherMultipleConsumers>newBuilder().build();
this.topicFactory = createPersistentTopicFactory();
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/PulsarBrokerOpenTelemetry.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/PulsarBrokerOpenTelemetry.java
index 178da8b8498..065b03e6454 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/PulsarBrokerOpenTelemetry.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/PulsarBrokerOpenTelemetry.java
@@ -19,6 +19,7 @@
package org.apache.pulsar.broker.stats;
import com.google.common.annotations.VisibleForTesting;
+import io.opentelemetry.api.OpenTelemetry;
import io.opentelemetry.api.metrics.Meter;
import
io.opentelemetry.sdk.autoconfigure.AutoConfiguredOpenTelemetrySdkBuilder;
import java.io.Closeable;
@@ -50,6 +51,10 @@ public class PulsarBrokerOpenTelemetry implements Closeable {
meter =
openTelemetryService.getOpenTelemetry().getMeter(Constants.BROKER_INSTRUMENTATION_SCOPE_NAME);
}
+ public OpenTelemetry getOpenTelemetry() {
+ return openTelemetryService.getOpenTelemetry();
+ }
+
@Override
public void close() {
openTelemetryService.close();
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/MetadataStoreStatsTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/MetadataStoreStatsTest.java
index 726bde3f3d0..27bdb2e3004 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/MetadataStoreStatsTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/MetadataStoreStatsTest.java
@@ -32,7 +32,7 @@ import lombok.Cleanup;
import org.apache.commons.lang3.StringUtils;
import org.apache.pulsar.PrometheusMetricsTestUtil;
import org.apache.pulsar.broker.ServiceConfiguration;
-import org.apache.pulsar.broker.authentication.AuthenticationProviderToken;
+import
org.apache.pulsar.broker.authentication.metrics.AuthenticationMetricsToken;
import org.apache.pulsar.broker.service.BrokerTestBase;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.Message;
@@ -52,7 +52,7 @@ public class MetadataStoreStatsTest extends BrokerTestBase {
@Override
protected void setup() throws Exception {
super.baseSetup();
- AuthenticationProviderToken.resetMetrics();
+ AuthenticationMetricsToken.reset();
}
@Override
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/OpenTelemetryAuthenticationStatsTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/OpenTelemetryAuthenticationStatsTest.java
new file mode 100644
index 00000000000..4cde37b50ff
--- /dev/null
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/OpenTelemetryAuthenticationStatsTest.java
@@ -0,0 +1,161 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.stats;
+
+import static
io.opentelemetry.sdk.testing.assertj.OpenTelemetryAssertions.assertThat;
+import static
org.apache.pulsar.broker.stats.BrokerOpenTelemetryTestUtil.assertMetricLongSumValue;
+import io.jsonwebtoken.SignatureAlgorithm;
+import io.opentelemetry.api.common.Attributes;
+import java.time.Duration;
+import java.util.Date;
+import java.util.Optional;
+import java.util.Properties;
+import java.util.concurrent.TimeUnit;
+import javax.crypto.SecretKey;
+import javax.naming.AuthenticationException;
+import org.apache.pulsar.broker.ServiceConfiguration;
+import org.apache.pulsar.broker.authentication.AuthenticationDataSource;
+import org.apache.pulsar.broker.authentication.AuthenticationProvider;
+import org.apache.pulsar.broker.authentication.AuthenticationProviderToken;
+import org.apache.pulsar.broker.authentication.metrics.AuthenticationMetrics;
+import
org.apache.pulsar.broker.authentication.metrics.AuthenticationMetricsToken;
+import org.apache.pulsar.broker.authentication.utils.AuthTokenUtils;
+import org.apache.pulsar.broker.service.BrokerTestBase;
+import org.apache.pulsar.broker.testcontext.PulsarTestContext;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+public class OpenTelemetryAuthenticationStatsTest extends BrokerTestBase {
+
+ private static final Duration AUTHENTICATION_TIMEOUT =
Duration.ofSeconds(1);
+
+ private SecretKey secretKey;
+ private AuthenticationProvider provider;
+
+ @BeforeMethod(alwaysRun = true)
+ @Override
+ protected void setup() throws Exception {
+ super.baseSetup();
+
+ secretKey = AuthTokenUtils.createSecretKey(SignatureAlgorithm.HS256);
+ provider = new AuthenticationProviderToken();
+ registerCloseable(provider);
+
+ var properties = new Properties();
+ properties.setProperty("tokenSecretKey",
AuthTokenUtils.encodeKeyBase64(secretKey));
+
+ var conf = new ServiceConfiguration();
+ conf.setProperties(properties);
+
+ var authenticationProviderContext =
AuthenticationProvider.Context.builder()
+ .config(conf)
+ .openTelemetry(pulsar.getOpenTelemetry().getOpenTelemetry())
+ .build();
+ provider.initialize(authenticationProviderContext);
+ }
+
+ @AfterMethod(alwaysRun = true)
+ @Override
+ protected void cleanup() throws Exception {
+ super.internalCleanup();
+ }
+
+ @Override
+ protected void
customizeMainPulsarTestContextBuilder(PulsarTestContext.Builder builder) {
+ super.customizeMainPulsarTestContextBuilder(builder);
+ builder.enableOpenTelemetry(true);
+ }
+
+ @Test
+ public void testAuthenticationSuccess() {
+ // Pulsar protocol auth
+ assertThat(provider.authenticateAsync(new
TestAuthenticationDataSource(Optional.empty())))
+ .succeedsWithin(AUTHENTICATION_TIMEOUT);
+
assertMetricLongSumValue(pulsarTestContext.getOpenTelemetryMetricReader().collectAllMetrics(),
+ AuthenticationMetrics.AUTHENTICATION_COUNTER_METRIC_NAME,
+ Attributes.of(AuthenticationMetrics.PROVIDER_KEY,
"AuthenticationProviderToken",
+ AuthenticationMetrics.AUTH_RESULT_KEY, "success",
+ AuthenticationMetrics.AUTH_METHOD_KEY, "token"),
+ 1);
+ }
+
+ @Test
+ public void testTokenDurationHistogram() {
+ // Token with expiry 15 seconds into the future
+ var expiryTime = Optional.of(new Date(System.currentTimeMillis() +
TimeUnit.SECONDS.toMillis(15)));
+ assertThat(provider.authenticateAsync(new
TestAuthenticationDataSource(expiryTime)))
+ .succeedsWithin(AUTHENTICATION_TIMEOUT);
+ // Token without expiry
+ assertThat(provider.authenticateAsync(new
TestAuthenticationDataSource(Optional.empty())))
+ .succeedsWithin(AUTHENTICATION_TIMEOUT);
+
assertThat(pulsarTestContext.getOpenTelemetryMetricReader().collectAllMetrics())
+ .anySatisfy(metric -> assertThat(metric)
+
.hasName(AuthenticationMetricsToken.EXPIRING_TOKEN_HISTOGRAM_METRIC_NAME)
+ .hasHistogramSatisfying(histogram ->
histogram.hasPointsSatisfying(
+ histogramPoint ->
histogramPoint.hasCount(2).hasMax(Double.POSITIVE_INFINITY))));
+ }
+
+ @Test
+ public void testAuthenticationFailure() {
+ // Authentication should fail if credentials not passed.
+ assertThat(provider.authenticateAsync(new AuthenticationDataSource() {
}))
+ .failsWithin(AUTHENTICATION_TIMEOUT)
+ .withThrowableThat()
+ .withRootCauseInstanceOf(AuthenticationException.class)
+ .withMessageContaining("No token credentials passed");
+
assertMetricLongSumValue(pulsarTestContext.getOpenTelemetryMetricReader().collectAllMetrics(),
+ AuthenticationMetrics.AUTHENTICATION_COUNTER_METRIC_NAME,
+ Attributes.of(AuthenticationMetrics.PROVIDER_KEY,
"AuthenticationProviderToken",
+ AuthenticationMetrics.AUTH_RESULT_KEY, "failure",
+ AuthenticationMetrics.AUTH_METHOD_KEY, "token",
+ AuthenticationMetrics.ERROR_CODE_KEY,
"INVALID_AUTH_DATA"),
+ 1);
+ }
+
+ @Test
+ public void testTokenExpired() {
+ var expiredDate = Optional.of(new Date(System.currentTimeMillis() -
TimeUnit.HOURS.toMillis(1)));
+ assertThat(provider.authenticateAsync(new
TestAuthenticationDataSource(expiredDate)))
+ .failsWithin(AUTHENTICATION_TIMEOUT)
+ .withThrowableThat()
+ .withRootCauseInstanceOf(AuthenticationException.class)
+ .withMessageContaining("JWT expired");
+
assertMetricLongSumValue(pulsarTestContext.getOpenTelemetryMetricReader().collectAllMetrics(),
+ AuthenticationMetricsToken.EXPIRED_TOKEN_COUNTER_METRIC_NAME,
Attributes.empty(), 1);
+ }
+
+ private class TestAuthenticationDataSource implements
AuthenticationDataSource {
+ private final String token;
+
+ public TestAuthenticationDataSource(Optional<Date> expiryTime) {
+ token = AuthTokenUtils.createToken(secretKey, "subject",
expiryTime);
+ }
+
+ @Override
+ public boolean hasDataFromCommand() {
+ return true;
+ }
+
+ @Override
+ public String getCommandData() {
+ return token;
+ }
+ }
+}
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/PrometheusMetricsTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/PrometheusMetricsTest.java
index e7f86d542a0..4df2d36a953 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/PrometheusMetricsTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/PrometheusMetricsTest.java
@@ -21,6 +21,7 @@ package org.apache.pulsar.broker.stats;
import static com.google.common.base.Preconditions.checkArgument;
import static
org.apache.pulsar.broker.stats.prometheus.PrometheusMetricsClient.Metric;
import static
org.apache.pulsar.broker.stats.prometheus.PrometheusMetricsClient.parseMetrics;
+import static org.assertj.core.api.AssertionsForClassTypes.assertThat;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
import static org.testng.Assert.assertEquals;
@@ -70,7 +71,9 @@ import org.apache.commons.lang3.StringUtils;
import org.apache.pulsar.PrometheusMetricsTestUtil;
import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.broker.authentication.AuthenticationDataSource;
+import org.apache.pulsar.broker.authentication.AuthenticationProvider;
import org.apache.pulsar.broker.authentication.AuthenticationProviderToken;
+import
org.apache.pulsar.broker.authentication.metrics.AuthenticationMetricsToken;
import org.apache.pulsar.broker.authentication.utils.AuthTokenUtils;
import
org.apache.pulsar.broker.loadbalance.extensions.channel.ServiceUnitStateData;
import org.apache.pulsar.broker.loadbalance.extensions.manager.UnloadManager;
@@ -108,7 +111,7 @@ public class PrometheusMetricsTest extends BrokerTestBase {
@Override
protected void setup() throws Exception {
super.baseSetup();
- AuthenticationProviderToken.resetMetrics();
+ AuthenticationMetricsToken.reset();
}
@Override
@@ -1499,7 +1502,7 @@ public class PrometheusMetricsTest extends BrokerTestBase
{
ServiceConfiguration conf = new ServiceConfiguration();
conf.setProperties(properties);
- provider.initialize(conf);
+
provider.initialize(AuthenticationProvider.Context.builder().config(conf).build());
try {
provider.authenticate(new AuthenticationDataSource() {
@@ -1563,7 +1566,7 @@ public class PrometheusMetricsTest extends BrokerTestBase
{
ServiceConfiguration conf = new ServiceConfiguration();
conf.setProperties(properties);
- provider.initialize(conf);
+
provider.initialize(AuthenticationProvider.Context.builder().config(conf).build());
Date expiredDate = new Date(System.currentTimeMillis() -
TimeUnit.HOURS.toMillis(1));
String expiredToken = AuthTokenUtils.createToken(secretKey, "subject",
Optional.of(expiredDate));
@@ -1599,6 +1602,7 @@ public class PrometheusMetricsTest extends BrokerTestBase
{
public void testExpiringTokenMetrics() throws Exception {
SecretKey secretKey =
AuthTokenUtils.createSecretKey(SignatureAlgorithm.HS256);
+ @Cleanup
AuthenticationProviderToken provider = new
AuthenticationProviderToken();
Properties properties = new Properties();
@@ -1606,7 +1610,7 @@ public class PrometheusMetricsTest extends BrokerTestBase
{
ServiceConfiguration conf = new ServiceConfiguration();
conf.setProperties(properties);
- provider.initialize(conf);
+
provider.initialize(AuthenticationProvider.Context.builder().config(conf).build());
int[] tokenRemainTime = new int[]{3, 7, 40, 100, 400};
@@ -1633,27 +1637,19 @@ public class PrometheusMetricsTest extends
BrokerTestBase {
Metric countMetric = ((List<Metric>)
metrics.get("pulsar_expiring_token_minutes_count")).get(0);
assertEquals(countMetric.value, tokenRemainTime.length);
List<Metric> cm = (List<Metric>)
metrics.get("pulsar_expiring_token_minutes_bucket");
- assertEquals(cm.size(), 5);
+ var buckets = cm.stream().map(m ->
m.tags.get("le")).collect(Collectors.toSet());
+ assertThat(buckets).isEqualTo(Set.of("5.0", "10.0", "60.0", "240.0",
"1440.0", "10080.0", "20160.0", "43200.0",
+ "129600.0", "259200.0", "388800.0", "525600.0", "+Inf"));
cm.forEach((e) -> {
- switch (e.tags.get("le")) {
- case "5.0":
- assertEquals(e.value, 1);
- break;
- case "10.0":
- assertEquals(e.value, 2);
- break;
- case "60.0":
- assertEquals(e.value, 3);
- break;
- case "240.0":
- assertEquals(e.value, 4);
- break;
- default:
- assertEquals(e.value, 5);
- break;
- }
+ var expectedValue = switch(e.tags.get("le")) {
+ case "5.0" -> 1;
+ case "10.0" -> 2;
+ case "60.0" -> 3;
+ case "240.0" -> 4;
+ default -> 5;
+ };
+ assertEquals(e.value, expectedValue);
});
- provider.close();
}
@Test