EronWright commented on code in PR #19849:
URL: https://github.com/apache/pulsar/pull/19849#discussion_r1147908509


##########
pulsar-broker-auth-oidc/src/main/java/org/apache/pulsar/broker/authentication/oidc/OpenIDProviderMetadataCache.java:
##########
@@ -0,0 +1,124 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.authentication.oidc;
+
+import static 
org.apache.pulsar.broker.authentication.oidc.AuthenticationProviderOpenID.CACHE_EXPIRATION_SECONDS;
+import static 
org.apache.pulsar.broker.authentication.oidc.AuthenticationProviderOpenID.CACHE_EXPIRATION_SECONDS_DEFAULT;
+import static 
org.apache.pulsar.broker.authentication.oidc.AuthenticationProviderOpenID.CACHE_SIZE;
+import static 
org.apache.pulsar.broker.authentication.oidc.AuthenticationProviderOpenID.CACHE_SIZE_DEFAULT;
+import static 
org.apache.pulsar.broker.authentication.oidc.AuthenticationProviderOpenID.incrementFailureMetric;
+import static 
org.apache.pulsar.broker.authentication.oidc.ConfigUtils.getConfigValueAsInt;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.ObjectReader;
+import com.github.benmanes.caffeine.cache.AsyncCacheLoader;
+import com.github.benmanes.caffeine.cache.AsyncLoadingCache;
+import com.github.benmanes.caffeine.cache.Caffeine;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
+import javax.annotation.Nonnull;
+import javax.naming.AuthenticationException;
+import org.apache.pulsar.broker.ServiceConfiguration;
+import org.asynchttpclient.AsyncHttpClient;
+
+/**
+ * Class used to cache metadata responses from OpenID Providers.
+ */
+class OpenIDProviderMetadataCache {
+
+    private final ObjectReader reader = new 
ObjectMapper().readerFor(OpenIDProviderMetadata.class);
+
+    /**
+     * A loader for the cache that retrieves the metadata from the issuer's 
/.well-known/openid-configuration endpoint.
+     * @return a connection to the issuer's /.well-known/openid-configuration 
endpoint
+     * @throws AuthenticationException if the URL is malformed or there is an 
exception while opening the connection
+     */
+    private AsyncCacheLoader<String, OpenIDProviderMetadata> 
getLoader(AsyncHttpClient client) {
+        return (issuer, executor) ->
+                // TODO OIDC spec 
https://openid.net/specs/openid-connect-discovery-1_0.html#NormalizationSteps
+                // calls for normalization according to RFC3986. Is that 
important to verify here?
+                client
+                    .prepareGet(issuer + "/.well-known/openid-configuration")
+                    .execute()
+                    .toCompletableFuture()
+                    .thenCompose(result -> {
+                        CompletableFuture<OpenIDProviderMetadata> future = new 
CompletableFuture<>();
+                        try {
+                            OpenIDProviderMetadata openIDProviderMetadata =
+                                    
reader.readValue(result.getResponseBodyAsBytes());
+                            verifyIssuer(issuer, openIDProviderMetadata);
+                            future.complete(openIDProviderMetadata);
+                        } catch (AuthenticationException e) {
+                            
incrementFailureMetric(AuthenticationExceptionCode.ERROR_RETRIEVING_PROVIDER_METADATA);
+                            future.completeExceptionally(e);
+                        } catch (Exception e) {
+                            
incrementFailureMetric(AuthenticationExceptionCode.ERROR_RETRIEVING_PROVIDER_METADATA);
+                            future.completeExceptionally(new 
AuthenticationException(
+                                    "Error retrieving OpenID Provider Metadata 
at " + issuer + ": " + e.getMessage()));
+                        }
+                        return future;
+                    });
+    }
+
+    private final AsyncLoadingCache<String, OpenIDProviderMetadata> cache;
+
+    OpenIDProviderMetadataCache(ServiceConfiguration config, AsyncHttpClient 
httpClient) {
+        int maxSize = getConfigValueAsInt(config, CACHE_SIZE, 
CACHE_SIZE_DEFAULT);
+        int expireAfterSeconds = getConfigValueAsInt(config, 
CACHE_EXPIRATION_SECONDS,
+                CACHE_EXPIRATION_SECONDS_DEFAULT);
+        this.cache = Caffeine.newBuilder()
+                .maximumSize(maxSize)
+                .expireAfterWrite(expireAfterSeconds, TimeUnit.SECONDS)
+                .buildAsync(getLoader(httpClient));
+    }
+
+    /**
+     * Retrieve the OpenID Provider Metadata for the provided issuer.
+     * <p>
+     * Note: this method does not do any validation on the parameterized 
issuer. The OpenID Connect discovery
+     * spec requires that the issuer use the HTTPS scheme: 
https://openid.net/specs/openid-connect-discovery-1_0.html#ProviderMetadata.
+     * The {@link AuthenticationProviderOpenID} class handles this 
verification.
+     *
+     * @param issuer - authority from which to retrieve the OpenID Provider 
Metadata
+     * @return the {@link OpenIDProviderMetadata} for the given issuer
+     * @throws AuthenticationException if any exceptions occur while 
retrieving the metadata.
+     */
+    CompletableFuture<OpenIDProviderMetadata> 
getOpenIDProviderMetadataForIssuer(@Nonnull String issuer) {
+        return cache.get(issuer);
+    }
+
+    /**
+     * Verify the issuer url, as required by the OpenID Connect spec:
+     *
+     * Per the OpenID Connect Discovery spec, the issuer value returned MUST 
be identical to the
+     * Issuer URL that was directly used to retrieve the configuration 
information. This MUST also
+     * be identical to the iss Claim value in ID Tokens issued from this 
Issuer.
+     * 
https://openid.net/specs/openid-connect-discovery-1_0.html#ProviderConfigurationValidation
+     *
+     * @param issuer - the issuer used to retrieve the metadata
+     * @param metadata - the OpenID Provider Metadata
+     * @throws AuthenticationException if the issuer does not exactly match 
the metadata issuer
+     */
+    private void verifyIssuer(@Nonnull String issuer, OpenIDProviderMetadata 
metadata) throws AuthenticationException {
+        if (!issuer.equals(metadata.getIssuer())) {
+            
incrementFailureMetric(AuthenticationExceptionCode.ISSUER_MISMATCH);
+            throw new AuthenticationException(String.format("Issuer URL 
mismatch: [%s] should match [%s]",
+                    issuer, metadata.getIssuer()));
+        }
+    }

Review Comment:
   I would suggest that this validation not be performed, because the 
Kubernetes OpenID Discovery feature is not 100% spec-compliant.  In particular, 
the actual `issuer` value may be different from the configured value.  For 
example, here's the disco document from a GKE cluster:
   
   ```
   $ curl -k https://kubernetes.default.svc/.well-known/openid-configuration
   {
       "issuer": 
"https://container.googleapis.com/v1/projects/xyz/locations/us-central1/clusters/test-oidc";,
       "jwks_uri": "https://35.223.170.12:443/openid/v1/jwks";,
       "response_types_supported": [
           "id_token"
       ],
       "subject_types_supported": [
           "public"
       ],
       "id_token_signing_alg_values_supported": [
           "RS256"
       ]
   }
   ```
   



##########
pulsar-broker-auth-oidc/src/main/java/org/apache/pulsar/broker/authentication/oidc/AuthenticationProviderOpenID.java:
##########
@@ -0,0 +1,441 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.authentication.oidc;
+
+import static 
org.apache.pulsar.broker.authentication.oidc.ConfigUtils.getConfigValueAsBoolean;
+import static 
org.apache.pulsar.broker.authentication.oidc.ConfigUtils.getConfigValueAsInt;
+import static 
org.apache.pulsar.broker.authentication.oidc.ConfigUtils.getConfigValueAsSet;
+import static 
org.apache.pulsar.broker.authentication.oidc.ConfigUtils.getConfigValueAsString;
+import com.auth0.jwk.InvalidPublicKeyException;
+import com.auth0.jwk.Jwk;
+import com.auth0.jwt.JWT;
+import com.auth0.jwt.JWTVerifier;
+import com.auth0.jwt.RegisteredClaims;
+import com.auth0.jwt.algorithms.Algorithm;
+import com.auth0.jwt.exceptions.AlgorithmMismatchException;
+import com.auth0.jwt.exceptions.InvalidClaimException;
+import com.auth0.jwt.exceptions.JWTDecodeException;
+import com.auth0.jwt.exceptions.JWTVerificationException;
+import com.auth0.jwt.exceptions.SignatureVerificationException;
+import com.auth0.jwt.exceptions.TokenExpiredException;
+import com.auth0.jwt.interfaces.Claim;
+import com.auth0.jwt.interfaces.DecodedJWT;
+import java.io.IOException;
+import java.net.SocketAddress;
+import java.security.PublicKey;
+import java.security.interfaces.ECPublicKey;
+import java.security.interfaces.RSAPublicKey;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import javax.naming.AuthenticationException;
+import javax.net.ssl.SSLSession;
+import org.apache.pulsar.broker.ServiceConfiguration;
+import org.apache.pulsar.broker.authentication.AuthenticationDataSource;
+import org.apache.pulsar.broker.authentication.AuthenticationProvider;
+import org.apache.pulsar.broker.authentication.AuthenticationProviderToken;
+import org.apache.pulsar.broker.authentication.AuthenticationState;
+import org.apache.pulsar.broker.authentication.metrics.AuthenticationMetrics;
+import org.apache.pulsar.common.api.AuthData;
+import org.asynchttpclient.AsyncHttpClient;
+import org.asynchttpclient.AsyncHttpClientConfig;
+import org.asynchttpclient.DefaultAsyncHttpClient;
+import org.asynchttpclient.DefaultAsyncHttpClientConfig;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * An {@link AuthenticationProvider} implementation that supports the usage of 
a JSON Web Token (JWT)
+ * for client authentication. This implementation retrieves the PublicKey from 
the JWT issuer (assuming the
+ * issuer is in the configured allowed list) and then uses that Public Key to 
verify the validity of the JWT's
+ * signature.
+ *
+ * The Public Keys for a given provider are cached based on certain configured 
parameters to improve performance.
+ * The tradeoff here is that the longer Public Keys are cached, the longer an 
invalidated token could be used. One way
+ * to ensure caches are cleared is to restart all brokers.
+ *
+ * Class is called from multiple threads. The implementation must be thread 
safe. This class expects to be loaded once
+ * and then called concurrently for each new connection. The cache is backed 
by a GuavaCachedJwkProvider, which is
+ * thread-safe.
+ *
+ * Supported algorithms are: RS256, RS384, RS512, ES256, ES384, ES512 where 
the naming conventions follow
+ * this RFC: https://datatracker.ietf.org/doc/html/rfc7518#section-3.1.
+ */
+public class AuthenticationProviderOpenID implements AuthenticationProvider {
+    private static final Logger log = 
LoggerFactory.getLogger(AuthenticationProviderOpenID.class);
+
+    private static final String SIMPLE_NAME = 
AuthenticationProviderOpenID.class.getSimpleName();
+
+    // Must match the value used by the OAuth2 Client Plugin.
+    private static final String AUTH_METHOD_NAME = "token";
+
+    // This is backed by an ObjectMapper, which is thread safe. It is an 
optimization
+    // to share this for decoding JWTs for all connections to this broker.
+    private final JWT jwtLibrary = new JWT();
+
+    private Set<String> issuers;
+
+    // This caches the map from Issuer URL to the jwks_uri served at the 
/.well-known/openid-configuration endpoint
+    private OpenIDProviderMetadataCache openIDProviderMetadataCache;
+
+    // A cache used to store the results of getting the JWKS from the jwks_uri 
for an issuer.
+    private JwksCache jwksCache;
+
+    // A list of supported algorithms. This is the "alg" field on the JWT.
+    // Source for strings: 
https://datatracker.ietf.org/doc/html/rfc7518#section-3.1.
+    private static final String ALG_RS256 = "RS256";
+    private static final String ALG_RS384 = "RS384";
+    private static final String ALG_RS512 = "RS512";
+    private static final String ALG_ES256 = "ES256";
+    private static final String ALG_ES384 = "ES384";
+    private static final String ALG_ES512 = "ES512";
+
+    private long acceptedTimeLeewaySeconds;
+    private boolean requireHttps;
+    private String roleClaim;
+
+    static final String ALLOWED_TOKEN_ISSUERS = "openIDAllowedTokenIssuers";
+    static final String ALLOWED_AUDIENCES = "openIDAllowedAudiences";
+    static final String ROLE_CLAIM = "openIDRoleClaim";
+    static final String ROLE_CLAIM_DEFAULT = "sub";
+    static final String ACCEPTED_TIME_LEEWAY_SECONDS = 
"openIDAcceptedTimeLeewaySeconds";
+    static final int ACCEPTED_TIME_LEEWAY_SECONDS_DEFAULT = 0;
+    static final String CACHE_SIZE = "openIDCacheSize";
+    static final int CACHE_SIZE_DEFAULT = 5;
+    static final String CACHE_EXPIRATION_SECONDS = 
"openIDCacheExpirationSeconds";
+    static final int CACHE_EXPIRATION_SECONDS_DEFAULT = 24 * 60 * 60;
+    static final String HTTP_CONNECTION_TIMEOUT_MILLIS = 
"openIDHttpConnectionTimeoutMillis";
+    static final int HTTP_CONNECTION_TIMEOUT_MILLIS_DEFAULT = 10_000;
+    static final String HTTP_READ_TIMEOUT_MILLIS = 
"openIDHttpReadTimeoutMillis";
+    static final int HTTP_READ_TIMEOUT_MILLIS_DEFAULT = 10_000;
+    static final String REQUIRE_HTTPS = "openIDRequireHttps";
+    static final boolean REQUIRE_HTTPS_DEFAULT = true;
+
+    // The list of audiences that are allowed to connect to this broker. A 
valid JWT must contain one of the audiences.
+    private String[] allowedAudiences;
+
+    @Override
+    public void initialize(ServiceConfiguration config) throws IOException {
+        this.allowedAudiences = 
validateAllowedAudiences(getConfigValueAsSet(config, ALLOWED_AUDIENCES));
+        this.roleClaim = getConfigValueAsString(config, ROLE_CLAIM, 
ROLE_CLAIM_DEFAULT);
+        this.acceptedTimeLeewaySeconds = getConfigValueAsInt(config, 
ACCEPTED_TIME_LEEWAY_SECONDS,
+                ACCEPTED_TIME_LEEWAY_SECONDS_DEFAULT);
+        this.requireHttps = getConfigValueAsBoolean(config, REQUIRE_HTTPS, 
REQUIRE_HTTPS_DEFAULT);
+        this.issuers = validateIssuers(getConfigValueAsSet(config, 
ALLOWED_TOKEN_ISSUERS));
+
+        int connectionTimeout = getConfigValueAsInt(config, 
HTTP_CONNECTION_TIMEOUT_MILLIS,
+                HTTP_CONNECTION_TIMEOUT_MILLIS_DEFAULT);
+        int readTimeout = getConfigValueAsInt(config, 
HTTP_READ_TIMEOUT_MILLIS, HTTP_READ_TIMEOUT_MILLIS_DEFAULT);
+        // TODO do we want to easily support custom TLS configuration? It'd be 
available via the JVM's args.
+        AsyncHttpClientConfig clientConfig = new 
DefaultAsyncHttpClientConfig.Builder()
+                .setConnectTimeout(connectionTimeout)
+                .setReadTimeout(readTimeout)
+                .build();
+        AsyncHttpClient httpClient = new DefaultAsyncHttpClient(clientConfig);
+
+        this.openIDProviderMetadataCache = new 
OpenIDProviderMetadataCache(config, httpClient);
+        this.jwksCache = new JwksCache(config, httpClient);
+    }
+
+    @Override
+    public String getAuthMethodName() {
+        return AUTH_METHOD_NAME;
+    }
+
+    /**
+     * Authenticate the parameterized {@link AuthenticationDataSource} by 
verifying the issuer is an allowed issuer,
+     * then retrieving the JWKS URI from the issuer, then retrieving the 
Public key from the JWKS URI, and finally
+     * verifying the JWT signature and claims.
+     *
+     * @param authData - the authData passed by the Pulsar Broker containing 
the token.
+     * @return the role, if the JWT is authenticated, otherwise a failed 
future.
+     */
+    @Override
+    public CompletableFuture<String> 
authenticateAsync(AuthenticationDataSource authData) {
+        return authenticateTokenAsync(authData).thenApply(this::getRole);
+    }
+
+    /**
+     * Authenticate the parameterized {@link AuthenticationDataSource} and 
return the decoded JWT.
+     * @param authData - the authData containing the token.
+     * @return a completed future with the decoded JWT, if the JWT is 
authenticated. Otherwise, a failed future.
+     */
+    CompletableFuture<DecodedJWT> 
authenticateTokenAsync(AuthenticationDataSource authData) {
+        String token;
+        try {
+            token = AuthenticationProviderToken.getToken(authData);
+        } catch (AuthenticationException e) {
+            
incrementFailureMetric(AuthenticationExceptionCode.ERROR_DECODING_JWT);
+            return CompletableFuture.failedFuture(e);
+        }
+        return authenticateToken(token)
+                .whenComplete((jwt, e) -> {
+                    if (jwt != null) {
+                        
AuthenticationMetrics.authenticateSuccess(getClass().getSimpleName(), 
getAuthMethodName());
+                    }
+                    // Failure metrics are incremented within methods above
+                });
+    }
+
+    /**
+     * Get the role from a JWT at the configured role claim field.
+     * NOTE: does not do any verification of the JWT
+     * @param jwt - token to get the role from
+     * @return the role, or null, if it is not set on the JWT
+     */
+    String getRole(DecodedJWT jwt) {
+        try {
+            Claim roleClaim = jwt.getClaim(this.roleClaim);
+            if (roleClaim.isNull()) {
+                // The claim was not present in the JWT
+                return null;
+            }
+
+            String role = roleClaim.asString();
+            if (role != null) {
+                // The role is non null only if the JSON node is a text field
+                return role;
+            }
+
+            List<String> roles = 
jwt.getClaim(this.roleClaim).asList(String.class);
+            if (roles == null || roles.size() == 0) {
+                return null;
+            } else if (roles.size() == 1) {
+                return roles.get(0);
+            } else {
+                log.debug("JWT for subject [{}] has multiple roles; using the 
first one.", jwt.getSubject());
+                return roles.get(0);
+            }
+        } catch (JWTDecodeException e) {
+            log.error("Exception while retrieving role from JWT", e);
+            return null;
+        }
+    }
+
+    /**
+     * Convert a JWT string into a {@link DecodedJWT}
+     * The benefit of using this method is that it utilizes the already 
instantiated {@link JWT} parser.
+     * WARNING: this method does not verify the authenticity of the token. It 
only decodes it.
+     *
+     * @param token - string JWT to be decoded
+     * @return a decoded JWT
+     * @throws AuthenticationException if the token string is null or if any 
part of the token contains
+     *         an invalid jwt or JSON format of each of the jwt parts.
+     */
+    DecodedJWT decodeJWT(String token) throws AuthenticationException {
+        if (token == null) {
+            
incrementFailureMetric(AuthenticationExceptionCode.ERROR_DECODING_JWT);
+            throw new AuthenticationException("Invalid token: cannot be null");
+        }
+        try {
+            return jwtLibrary.decodeJwt(token);
+        } catch (JWTDecodeException e) {
+            
incrementFailureMetric(AuthenticationExceptionCode.ERROR_DECODING_JWT);
+            throw new AuthenticationException("Unable to decode JWT: " + 
e.getMessage());
+        }
+    }
+
+    /**
+     * Authenticate the parameterized JWT.
+     *
+     * @param token - a nonnull JWT to authenticate
+     * @return a fully authenticated JWT, or AuthenticationException if the 
JWT is proven to be invalid in any way
+     */
+    private CompletableFuture<DecodedJWT> authenticateToken(String token) {
+        if (token == null) {
+            
incrementFailureMetric(AuthenticationExceptionCode.ERROR_DECODING_JWT);
+            return CompletableFuture.failedFuture(new 
AuthenticationException("JWT cannot be null"));
+        }
+        final DecodedJWT jwt;
+        try {
+            jwt = decodeJWT(token);
+        } catch (AuthenticationException e) {
+            
incrementFailureMetric(AuthenticationExceptionCode.ERROR_DECODING_JWT);
+            return CompletableFuture.failedFuture(e);
+        }
+        return verifyIssuerAndGetJwk(jwt)
+                .thenCompose(jwk -> {
+                    try {
+                        if (!jwt.getAlgorithm().equals(jwk.getAlgorithm())) {
+                            
incrementFailureMetric(AuthenticationExceptionCode.ALGORITHM_MISMATCH);
+                            return CompletableFuture.failedFuture(
+                                    new AuthenticationException("JWK's alg [" 
+ jwk.getAlgorithm()
+                                            + "] does not match JWT's alg [" + 
jwt.getAlgorithm() + "]"));
+                        }
+                        // Verify the JWT signature
+                        // Throws exception if any verification check fails
+                        return CompletableFuture
+                                .completedFuture(verifyJWT(jwk.getPublicKey(), 
jwt.getAlgorithm(), jwt));
+                    } catch (InvalidPublicKeyException e) {
+                        
incrementFailureMetric(AuthenticationExceptionCode.INVALID_PUBLIC_KEY);
+                        return CompletableFuture.failedFuture(
+                                new AuthenticationException("Invalid public 
key: " + e.getMessage()));
+                    } catch (AuthenticationException e) {
+                        return CompletableFuture.failedFuture(e);
+                    }
+                });
+    }
+
+    private CompletableFuture<Jwk> verifyIssuerAndGetJwk(DecodedJWT jwt) {
+        // Verify that the issuer claim is nonnull and allowed.
+        if (jwt.getIssuer() == null || 
!this.issuers.contains(jwt.getIssuer())) {
+            
incrementFailureMetric(AuthenticationExceptionCode.UNSUPPORTED_ISSUER);
+            return CompletableFuture
+                    .failedFuture(new AuthenticationException("Issuer not 
allowed: " + jwt.getIssuer()));
+        }
+        // Retrieve the metadata: 
https://openid.net/specs/openid-connect-discovery-1_0.html#ProviderMetadata
+        return 
openIDProviderMetadataCache.getOpenIDProviderMetadataForIssuer(jwt.getIssuer())
+                .thenCompose(metadata -> 
jwksCache.getJwk(metadata.getJwksUri(), jwt.getKeyId()));

Review Comment:
   Note that the value of `jwt.getIssuer()` here would match the _actual_ value 
of the `issuer` field in the metadata document.  It would not match the 
_configured_ issuer URL; see the GKE example elsewhere in this review.



##########
pulsar-broker-auth-oidc/src/main/java/org/apache/pulsar/broker/authentication/oidc/OpenIDProviderMetadataCache.java:
##########
@@ -0,0 +1,124 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.authentication.oidc;
+
+import static 
org.apache.pulsar.broker.authentication.oidc.AuthenticationProviderOpenID.CACHE_EXPIRATION_SECONDS;
+import static 
org.apache.pulsar.broker.authentication.oidc.AuthenticationProviderOpenID.CACHE_EXPIRATION_SECONDS_DEFAULT;
+import static 
org.apache.pulsar.broker.authentication.oidc.AuthenticationProviderOpenID.CACHE_SIZE;
+import static 
org.apache.pulsar.broker.authentication.oidc.AuthenticationProviderOpenID.CACHE_SIZE_DEFAULT;
+import static 
org.apache.pulsar.broker.authentication.oidc.AuthenticationProviderOpenID.incrementFailureMetric;
+import static 
org.apache.pulsar.broker.authentication.oidc.ConfigUtils.getConfigValueAsInt;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.ObjectReader;
+import com.github.benmanes.caffeine.cache.AsyncCacheLoader;
+import com.github.benmanes.caffeine.cache.AsyncLoadingCache;
+import com.github.benmanes.caffeine.cache.Caffeine;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
+import javax.annotation.Nonnull;
+import javax.naming.AuthenticationException;
+import org.apache.pulsar.broker.ServiceConfiguration;
+import org.asynchttpclient.AsyncHttpClient;
+
+/**
+ * Class used to cache metadata responses from OpenID Providers.
+ */
+class OpenIDProviderMetadataCache {
+
+    private final ObjectReader reader = new 
ObjectMapper().readerFor(OpenIDProviderMetadata.class);
+
+    /**
+     * A loader for the cache that retrieves the metadata from the issuer's 
/.well-known/openid-configuration endpoint.
+     * @return a connection to the issuer's /.well-known/openid-configuration 
endpoint
+     * @throws AuthenticationException if the URL is malformed or there is an 
exception while opening the connection
+     */
+    private AsyncCacheLoader<String, OpenIDProviderMetadata> 
getLoader(AsyncHttpClient client) {
+        return (issuer, executor) ->
+                // TODO OIDC spec 
https://openid.net/specs/openid-connect-discovery-1_0.html#NormalizationSteps
+                // calls for normalization according to RFC3986. Is that 
important to verify here?
+                client
+                    .prepareGet(issuer + "/.well-known/openid-configuration")

Review Comment:
   I have observed that Azure AD has a weird issuer URL that does NOT have a 
trialing slash.  Whereas, other environments do.  The normal rules of URL 
concatenation cannot be used here.  Please normalize the `issuer` parameter by 
stripping any trailing slash, before concatenating the 
`/.well-known/openid-configuration` suffix.  



##########
pulsar-broker-auth-oidc/src/main/java/org/apache/pulsar/broker/authentication/oidc/OpenIDProviderMetadataCache.java:
##########
@@ -0,0 +1,124 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.authentication.oidc;
+
+import static 
org.apache.pulsar.broker.authentication.oidc.AuthenticationProviderOpenID.CACHE_EXPIRATION_SECONDS;
+import static 
org.apache.pulsar.broker.authentication.oidc.AuthenticationProviderOpenID.CACHE_EXPIRATION_SECONDS_DEFAULT;
+import static 
org.apache.pulsar.broker.authentication.oidc.AuthenticationProviderOpenID.CACHE_SIZE;
+import static 
org.apache.pulsar.broker.authentication.oidc.AuthenticationProviderOpenID.CACHE_SIZE_DEFAULT;
+import static 
org.apache.pulsar.broker.authentication.oidc.AuthenticationProviderOpenID.incrementFailureMetric;
+import static 
org.apache.pulsar.broker.authentication.oidc.ConfigUtils.getConfigValueAsInt;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.ObjectReader;
+import com.github.benmanes.caffeine.cache.AsyncCacheLoader;
+import com.github.benmanes.caffeine.cache.AsyncLoadingCache;
+import com.github.benmanes.caffeine.cache.Caffeine;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
+import javax.annotation.Nonnull;
+import javax.naming.AuthenticationException;
+import org.apache.pulsar.broker.ServiceConfiguration;
+import org.asynchttpclient.AsyncHttpClient;
+
+/**
+ * Class used to cache metadata responses from OpenID Providers.
+ */
+class OpenIDProviderMetadataCache {
+
+    private final ObjectReader reader = new 
ObjectMapper().readerFor(OpenIDProviderMetadata.class);
+
+    /**
+     * A loader for the cache that retrieves the metadata from the issuer's 
/.well-known/openid-configuration endpoint.
+     * @return a connection to the issuer's /.well-known/openid-configuration 
endpoint
+     * @throws AuthenticationException if the URL is malformed or there is an 
exception while opening the connection
+     */
+    private AsyncCacheLoader<String, OpenIDProviderMetadata> 
getLoader(AsyncHttpClient client) {
+        return (issuer, executor) ->
+                // TODO OIDC spec 
https://openid.net/specs/openid-connect-discovery-1_0.html#NormalizationSteps
+                // calls for normalization according to RFC3986. Is that 
important to verify here?
+                client
+                    .prepareGet(issuer + "/.well-known/openid-configuration")
+                    .execute()
+                    .toCompletableFuture()
+                    .thenCompose(result -> {
+                        CompletableFuture<OpenIDProviderMetadata> future = new 
CompletableFuture<>();
+                        try {
+                            OpenIDProviderMetadata openIDProviderMetadata =
+                                    
reader.readValue(result.getResponseBodyAsBytes());
+                            verifyIssuer(issuer, openIDProviderMetadata);
+                            future.complete(openIDProviderMetadata);
+                        } catch (AuthenticationException e) {
+                            
incrementFailureMetric(AuthenticationExceptionCode.ERROR_RETRIEVING_PROVIDER_METADATA);
+                            future.completeExceptionally(e);
+                        } catch (Exception e) {
+                            
incrementFailureMetric(AuthenticationExceptionCode.ERROR_RETRIEVING_PROVIDER_METADATA);
+                            future.completeExceptionally(new 
AuthenticationException(
+                                    "Error retrieving OpenID Provider Metadata 
at " + issuer + ": " + e.getMessage()));
+                        }
+                        return future;

Review Comment:
   By whatever means, please use the actual value of 
`OpenIDProviderMetadata::getIssuer()` as the key into the cache, as opposed to 
the configured issuer value. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to