This is an automated email from the ASF dual-hosted git repository.

lhotari pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new b789d825feb [feat][client] oauth2 trustcerts file and timeouts (#24944)
b789d825feb is described below

commit b789d825feb3975092b2da59dea10e512d5d0b42
Author: gulecroc <[email protected]>
AuthorDate: Tue Nov 11 16:20:42 2025 +0100

    [feat][client] oauth2 trustcerts file and timeouts (#24944)
---
 .../auth/oauth2/AuthenticationFactoryOAuth2.java   | 154 ++++++++++++++++++---
 .../impl/auth/oauth2/ClientCredentialsFlow.java    | 110 ++++++++-------
 .../pulsar/client/impl/auth/oauth2/FlowBase.java   |  76 +++++++++-
 .../oauth2/protocol/DefaultMetadataResolver.java   |  95 ++++++-------
 .../impl/auth/oauth2/protocol/TokenClient.java     |  59 +++-----
 .../oauth2/AuthenticationFactoryOAuth2Test.java    |  59 ++++++++
 6 files changed, 395 insertions(+), 158 deletions(-)

diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/oauth2/AuthenticationFactoryOAuth2.java
 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/oauth2/AuthenticationFactoryOAuth2.java
index c9abb3a3c01..033d5308a2a 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/oauth2/AuthenticationFactoryOAuth2.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/oauth2/AuthenticationFactoryOAuth2.java
@@ -20,6 +20,7 @@ package org.apache.pulsar.client.impl.auth.oauth2;
 
 import java.net.URL;
 import java.time.Clock;
+import java.time.Duration;
 import org.apache.pulsar.client.api.Authentication;
 
 /**
@@ -31,9 +32,9 @@ public final class AuthenticationFactoryOAuth2 {
     /**
      * Authenticate with client credentials.
      *
-     * @param issuerUrl the issuer URL
+     * @param issuerUrl      the issuer URL
      * @param credentialsUrl the credentials URL
-     * @param audience An optional field. The audience identifier used by some 
Identity Providers, like Auth0.
+     * @param audience       An optional field. The audience identifier used 
by some Identity Providers, like Auth0.
      * @return an Authentication object
      */
     public static Authentication clientCredentials(URL issuerUrl, URL 
credentialsUrl, String audience) {
@@ -43,23 +44,144 @@ public final class AuthenticationFactoryOAuth2 {
     /**
      * Authenticate with client credentials.
      *
-     * @param issuerUrl the issuer URL
+     * @param issuerUrl      the issuer URL
      * @param credentialsUrl the credentials URL
-     * @param audience An optional field. The audience identifier used by some 
Identity Providers, like Auth0.
-     * @param scope An optional field. The value of the scope parameter is 
expressed as a list of space-delimited,
-     *              case-sensitive strings. The strings are defined by the 
authorization server.
-     *              If the value contains multiple space-delimited strings, 
their order does not matter,
-     *              and each string adds an additional access range to the 
requested scope.
-     *              From here: 
https://datatracker.ietf.org/doc/html/rfc6749#section-4.4.2
+     * @param audience       An optional field. The audience identifier used 
by some Identity Providers, like Auth0.
+     * @param scope          An optional field. The value of the scope 
parameter is expressed as a list of
+     *                       space-delimited,
+     *                       case-sensitive strings. The strings are defined 
by the authorization server.
+     *                       If the value contains multiple space-delimited 
strings, their order does not matter,
+     *                       and each string adds an additional access range 
to the requested scope.
+     *                       From here: 
https://datatracker.ietf.org/doc/html/rfc6749#section-4.4.2
      * @return an Authentication object
      */
     public static Authentication clientCredentials(URL issuerUrl, URL 
credentialsUrl, String audience, String scope) {
-        ClientCredentialsFlow flow = ClientCredentialsFlow.builder()
-                .issuerUrl(issuerUrl)
-                .privateKey(credentialsUrl.toExternalForm())
-                .audience(audience)
-                .scope(scope)
-                .build();
-        return new AuthenticationOAuth2(flow, Clock.systemDefaultZone());
+        return 
clientCredentialsBuilder().issuerUrl(issuerUrl).credentialsUrl(credentialsUrl).audience(audience)
+                .scope(scope).build();
     }
+
+    /**
+     * A builder to create an authentication with client credentials.
+     *
+     * @return the builder
+     */
+    public static ClientCredentialsBuilder clientCredentialsBuilder() {
+        return new ClientCredentialsBuilder();
+    }
+
+    public static class ClientCredentialsBuilder {
+
+        private URL issuerUrl;
+        private URL credentialsUrl;
+        private String audience;
+        private String scope;
+        private Duration connectTimeout;
+        private Duration readTimeout;
+        private String trustCertsFilePath;
+
+        private ClientCredentialsBuilder() {
+        }
+
+        /**
+         * Required issuer URL.
+         *
+         * @param issuerUrl the issuer URL
+         * @return the builder
+         */
+        public ClientCredentialsBuilder issuerUrl(URL issuerUrl) {
+            this.issuerUrl = issuerUrl;
+            return this;
+        }
+
+        /**
+         * Required credentials URL.
+         *
+         * @param credentialsUrl the credentials URL
+         * @return the builder
+         */
+        public ClientCredentialsBuilder credentialsUrl(URL credentialsUrl) {
+            this.credentialsUrl = credentialsUrl;
+            return this;
+        }
+
+        /**
+         * Optional audience identifier used by some Identity Providers, like 
Auth0.
+         *
+         * @param audience the audiance
+         * @return the builder
+         */
+        public ClientCredentialsBuilder audience(String audience) {
+            this.audience = audience;
+            return this;
+        }
+
+        /**
+         * Optional scope expressed as a list of space-delimited, 
case-sensitive strings.
+         * The strings are defined by the authorization server.
+         * If the value contains multiple space-delimited strings, their order 
does not matter,
+         * and each string adds an additional access range to the requested 
scope.
+         * From here: 
https://datatracker.ietf.org/doc/html/rfc6749#section-4.4.2
+         *
+         * @param scope the scope
+         * @return the builder
+         */
+        public ClientCredentialsBuilder scope(String scope) {
+            this.scope = scope;
+            return this;
+        }
+
+        /**
+         * Optional HTTP connection timeout.
+         *
+         * @param connectTimeout the connect timeout
+         * @return the builder
+         */
+        public ClientCredentialsBuilder connectTimeout(Duration 
connectTimeout) {
+            this.connectTimeout = connectTimeout;
+            return this;
+        }
+
+        /**
+         * Optional HTTP read timeout.
+         *
+         * @param readTimeout the read timeout
+         * @return the builder
+         */
+        public ClientCredentialsBuilder readTimeout(Duration readTimeout) {
+            this.readTimeout = readTimeout;
+            return this;
+        }
+
+        /**
+         * Optional path to the file containing the trusted certificate(s) of 
the token issuer.
+         *
+         * @param trustCertsFilePath the path to the file containing the 
trusted certificate(s)
+         * @return the builder
+         */
+        public ClientCredentialsBuilder trustCertsFilePath(String 
trustCertsFilePath) {
+            this.trustCertsFilePath = trustCertsFilePath;
+            return this;
+        }
+
+        /**
+         * Authenticate with client credentials.
+         *
+         * @return an Authentication object
+         */
+        public Authentication build() {
+            ClientCredentialsFlow flow = ClientCredentialsFlow.builder()
+                    .issuerUrl(issuerUrl)
+                    .privateKey(credentialsUrl == null ? null : 
credentialsUrl.toExternalForm())
+                    .audience(audience)
+                    .scope(scope)
+                    .connectTimeout(connectTimeout)
+                    .readTimeout(readTimeout)
+                    .trustCertsFilePath(trustCertsFilePath)
+                    .build();
+            return new AuthenticationOAuth2(flow, Clock.systemDefaultZone());
+        }
+
+    }
+
+
 }
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/oauth2/ClientCredentialsFlow.java
 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/oauth2/ClientCredentialsFlow.java
index ef10f1afdb6..7f64c0b18ac 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/oauth2/ClientCredentialsFlow.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/oauth2/ClientCredentialsFlow.java
@@ -26,6 +26,7 @@ import java.net.URISyntaxException;
 import java.net.URL;
 import java.net.URLConnection;
 import java.nio.charset.StandardCharsets;
+import java.time.Duration;
 import java.util.Map;
 import lombok.Builder;
 import lombok.extern.slf4j.Slf4j;
@@ -60,62 +61,17 @@ class ClientCredentialsFlow extends FlowBase {
     private boolean initialized = false;
 
     @Builder
-    public ClientCredentialsFlow(URL issuerUrl, String audience, String 
privateKey, String scope) {
-        super(issuerUrl);
+    public ClientCredentialsFlow(URL issuerUrl, String audience, String 
privateKey, String scope,
+                                 Duration connectTimeout, Duration 
readTimeout, String trustCertsFilePath) {
+        super(issuerUrl, connectTimeout, readTimeout, trustCertsFilePath);
         this.audience = audience;
         this.privateKey = privateKey;
         this.scope = scope;
     }
 
-    @Override
-    public void initialize() throws PulsarClientException {
-        super.initialize();
-        assert this.metadata != null;
-
-        URL tokenUrl = this.metadata.getTokenEndpoint();
-        this.exchanger = new TokenClient(tokenUrl);
-        initialized = true;
-    }
-
-    public TokenResult authenticate() throws PulsarClientException {
-        // read the private key from storage
-        KeyFile keyFile;
-        try {
-            keyFile = loadPrivateKey(this.privateKey);
-        } catch (IOException e) {
-            throw new PulsarClientException.AuthenticationException("Unable to 
read private key: " + e.getMessage());
-        }
-
-        // request an access token using client credentials
-        ClientCredentialsExchangeRequest req = 
ClientCredentialsExchangeRequest.builder()
-                .clientId(keyFile.getClientId())
-                .clientSecret(keyFile.getClientSecret())
-                .audience(this.audience)
-                .scope(this.scope)
-                .build();
-        TokenResult tr;
-        if (!initialized) {
-            initialize();
-        }
-        try {
-            tr = this.exchanger.exchangeClientCredentials(req);
-        } catch (TokenExchangeException | IOException e) {
-            throw new PulsarClientException.AuthenticationException("Unable to 
obtain an access token: "
-                                                                    + 
e.getMessage());
-        }
-
-        return tr;
-    }
-
-    @Override
-    public void close() throws Exception {
-        if (exchanger != null) {
-            exchanger.close();
-        }
-    }
-
     /**
      * Constructs a {@link ClientCredentialsFlow} from configuration 
parameters.
+     *
      * @param params
      * @return
      */
@@ -125,16 +81,24 @@ class ClientCredentialsFlow extends FlowBase {
         // These are optional parameters, so we only perform a get
         String scope = params.get(CONFIG_PARAM_SCOPE);
         String audience = params.get(CONFIG_PARAM_AUDIENCE);
+        Duration connectTimeout = parseParameterDuration(params, 
CONFIG_PARAM_CONNECT_TIMEOUT);
+        Duration readTimeout = parseParameterDuration(params, 
CONFIG_PARAM_READ_TIMEOUT);
+        String trustCertsFilePath = 
params.get(CONFIG_PARAM_TRUST_CERTS_FILE_PATH);
+
         return ClientCredentialsFlow.builder()
                 .issuerUrl(issuerUrl)
                 .audience(audience)
                 .privateKey(privateKeyUrl)
                 .scope(scope)
+                .connectTimeout(connectTimeout)
+                .readTimeout(readTimeout)
+                .trustCertsFilePath(trustCertsFilePath)
                 .build();
     }
 
     /**
      * Loads the private key from the given URL.
+     *
      * @param privateKeyURL
      * @return
      * @throws IOException
@@ -162,4 +126,52 @@ class ClientCredentialsFlow extends FlowBase {
             throw new IOException("Invalid privateKey format", e);
         }
     }
+
+    @Override
+    public void initialize() throws PulsarClientException {
+        super.initialize();
+        assert this.metadata != null;
+
+        URL tokenUrl = this.metadata.getTokenEndpoint();
+        this.exchanger = new TokenClient(tokenUrl, httpClient);
+        initialized = true;
+    }
+
+    public TokenResult authenticate() throws PulsarClientException {
+        // read the private key from storage
+        KeyFile keyFile;
+        try {
+            keyFile = loadPrivateKey(this.privateKey);
+        } catch (IOException e) {
+            throw new PulsarClientException.AuthenticationException("Unable to 
read private key: " + e.getMessage());
+        }
+
+        // request an access token using client credentials
+        ClientCredentialsExchangeRequest req = 
ClientCredentialsExchangeRequest.builder()
+                .clientId(keyFile.getClientId())
+                .clientSecret(keyFile.getClientSecret())
+                .audience(this.audience)
+                .scope(this.scope)
+                .build();
+        TokenResult tr;
+        if (!initialized) {
+            initialize();
+        }
+        try {
+            tr = this.exchanger.exchangeClientCredentials(req);
+        } catch (TokenExchangeException | IOException e) {
+            throw new PulsarClientException.AuthenticationException("Unable to 
obtain an access token: "
+                    + e.getMessage());
+        }
+
+        return tr;
+    }
+
+    @Override
+    public void close() throws Exception {
+        super.close();
+        if (exchanger != null) {
+            exchanger.close();
+        }
+    }
 }
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/oauth2/FlowBase.java
 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/oauth2/FlowBase.java
index 125a8800862..6cc9f8e41b5 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/oauth2/FlowBase.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/oauth2/FlowBase.java
@@ -18,16 +18,25 @@
  */
 package org.apache.pulsar.client.impl.auth.oauth2;
 
+import io.netty.handler.ssl.SslContextBuilder;
+import java.io.File;
 import java.io.IOException;
 import java.net.MalformedURLException;
 import java.net.URL;
+import java.time.Duration;
+import java.time.format.DateTimeParseException;
 import java.util.Map;
+import javax.net.ssl.SSLException;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.commons.lang3.StringUtils;
+import org.apache.pulsar.PulsarVersion;
 import org.apache.pulsar.client.api.PulsarClientException;
 import 
org.apache.pulsar.client.impl.auth.oauth2.protocol.DefaultMetadataResolver;
 import org.apache.pulsar.client.impl.auth.oauth2.protocol.Metadata;
 import org.apache.pulsar.client.impl.auth.oauth2.protocol.MetadataResolver;
+import org.asynchttpclient.AsyncHttpClient;
+import org.asynchttpclient.DefaultAsyncHttpClient;
+import org.asynchttpclient.DefaultAsyncHttpClientConfig;
 
 /**
  * An abstract OAuth 2.0 authorization flow.
@@ -35,14 +44,60 @@ import 
org.apache.pulsar.client.impl.auth.oauth2.protocol.MetadataResolver;
 @Slf4j
 abstract class FlowBase implements Flow {
 
+    public static final String CONFIG_PARAM_CONNECT_TIMEOUT = "connectTimeout";
+    public static final String CONFIG_PARAM_READ_TIMEOUT = "readTimeout";
+    public static final String CONFIG_PARAM_TRUST_CERTS_FILE_PATH = 
"trustCertsFilePath";
+
+    protected static final Duration DEFAULT_CONNECT_TIMEOUT = 
Duration.ofSeconds(10);
+    protected static final Duration DEFAULT_READ_TIMEOUT = 
Duration.ofSeconds(30);
+
     private static final long serialVersionUID = 1L;
 
     protected final URL issuerUrl;
+    protected final AsyncHttpClient httpClient;
 
     protected transient Metadata metadata;
 
-    protected FlowBase(URL issuerUrl) {
+    protected FlowBase(URL issuerUrl, Duration connectTimeout, Duration 
readTimeout, String trustCertsFilePath) {
         this.issuerUrl = issuerUrl;
+        this.httpClient = defaultHttpClient(readTimeout, connectTimeout, 
trustCertsFilePath);
+    }
+
+    private AsyncHttpClient defaultHttpClient(Duration readTimeout, Duration 
connectTimeout,
+                                              String trustCertsFilePath) {
+        DefaultAsyncHttpClientConfig.Builder confBuilder = new 
DefaultAsyncHttpClientConfig.Builder();
+        confBuilder.setCookieStore(null);
+        confBuilder.setUseProxyProperties(true);
+        confBuilder.setFollowRedirect(true);
+        confBuilder.setConnectTimeout(
+                getParameterDurationToMillis(CONFIG_PARAM_CONNECT_TIMEOUT, 
connectTimeout,
+                        DEFAULT_CONNECT_TIMEOUT));
+        confBuilder.setReadTimeout(
+                getParameterDurationToMillis(CONFIG_PARAM_READ_TIMEOUT, 
readTimeout, DEFAULT_READ_TIMEOUT));
+        confBuilder.setUserAgent(String.format("Pulsar-Java-v%s", 
PulsarVersion.getVersion()));
+        if (StringUtils.isNotBlank(trustCertsFilePath)) {
+            try {
+                confBuilder.setSslContext(SslContextBuilder.forClient()
+                        .trustManager(new File(trustCertsFilePath))
+                        .build());
+            } catch (SSLException e) {
+                log.error("Could not set " + 
CONFIG_PARAM_TRUST_CERTS_FILE_PATH, e);
+            }
+        }
+        return new DefaultAsyncHttpClient(confBuilder.build());
+    }
+
+    private int getParameterDurationToMillis(String name, Duration value, 
Duration defaultValue) {
+        Duration duration;
+        if (value == null) {
+            log.info("Configuration for [{}] is using the default value: 
[{}]", name, defaultValue);
+            duration = defaultValue;
+        } else {
+            log.info("Configuration for [{}] is: [{}]", name, value);
+            duration = value;
+        }
+
+        return (int) duration.toMillis();
     }
 
     public void initialize() throws PulsarClientException {
@@ -55,7 +110,7 @@ abstract class FlowBase implements Flow {
     }
 
     protected MetadataResolver createMetadataResolver() {
-        return DefaultMetadataResolver.fromIssuerUrl(issuerUrl);
+        return DefaultMetadataResolver.fromIssuerUrl(issuerUrl, httpClient);
     }
 
     static String parseParameterString(Map<String, String> params, String 
name) {
@@ -77,4 +132,21 @@ abstract class FlowBase implements Flow {
             throw new IllegalArgumentException("Malformed configuration 
parameter: " + name);
         }
     }
+
+    static Duration parseParameterDuration(Map<String, String> params, String 
name) {
+        String value = params.get(name);
+        if (StringUtils.isNotBlank(value)) {
+            try {
+                return Duration.parse(value);
+            } catch (DateTimeParseException e) {
+                throw new IllegalArgumentException("Malformed configuration 
parameter: " + name, e);
+            }
+        }
+        return null;
+    }
+
+    @Override
+    public void close() throws Exception {
+        httpClient.close();
+    }
 }
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/oauth2/protocol/DefaultMetadataResolver.java
 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/oauth2/protocol/DefaultMetadataResolver.java
index be636145cb2..19d0c1acadd 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/oauth2/protocol/DefaultMetadataResolver.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/oauth2/protocol/DefaultMetadataResolver.java
@@ -19,88 +19,50 @@
 package org.apache.pulsar.client.impl.auth.oauth2.protocol;
 
 import com.fasterxml.jackson.databind.ObjectReader;
+import io.netty.handler.codec.http.HttpHeaderNames;
+import io.netty.handler.codec.http.HttpHeaderValues;
 import java.io.IOException;
 import java.io.InputStream;
 import java.net.MalformedURLException;
 import java.net.URI;
 import java.net.URL;
-import java.net.URLConnection;
-import java.time.Duration;
+import java.util.concurrent.ExecutionException;
 import org.apache.pulsar.common.util.ObjectMapperFactory;
+import org.asynchttpclient.AsyncHttpClient;
+import org.asynchttpclient.Response;
 
 /**
  * Resolves OAuth 2.0 authorization server metadata as described in RFC 8414.
  */
 public class DefaultMetadataResolver implements MetadataResolver {
 
-    protected static final int DEFAULT_CONNECT_TIMEOUT_IN_SECONDS = 10;
-    protected static final int DEFAULT_READ_TIMEOUT_IN_SECONDS = 30;
-
     private final URL metadataUrl;
     private final ObjectReader objectReader;
-    private Duration connectTimeout;
-    private Duration readTimeout;
+    private final AsyncHttpClient httpClient;
 
-    public DefaultMetadataResolver(URL metadataUrl) {
+    public DefaultMetadataResolver(URL metadataUrl, AsyncHttpClient 
httpClient) {
         this.metadataUrl = metadataUrl;
         this.objectReader = 
ObjectMapperFactory.getMapper().reader().forType(Metadata.class);
-        // set a default timeout to ensure that this doesn't block
-        this.connectTimeout = 
Duration.ofSeconds(DEFAULT_CONNECT_TIMEOUT_IN_SECONDS);
-        this.readTimeout = Duration.ofSeconds(DEFAULT_READ_TIMEOUT_IN_SECONDS);
-    }
-
-    public DefaultMetadataResolver withConnectTimeout(Duration connectTimeout) 
{
-        this.connectTimeout = connectTimeout;
-        return this;
-    }
-
-    public DefaultMetadataResolver withReadTimeout(Duration readTimeout) {
-        this.readTimeout = readTimeout;
-        return this;
-    }
-
-    /**
-     * Resolves the authorization metadata.
-     * @return metadata
-     * @throws IOException if the metadata could not be resolved.
-     */
-    public Metadata resolve() throws IOException {
-        try {
-            URLConnection c = this.metadataUrl.openConnection();
-            if (connectTimeout != null) {
-                c.setConnectTimeout((int) connectTimeout.toMillis());
-            }
-            if (readTimeout != null) {
-                c.setReadTimeout((int) readTimeout.toMillis());
-            }
-            c.setRequestProperty("Accept", "application/json");
-
-            Metadata metadata;
-            try (InputStream inputStream = c.getInputStream()) {
-                metadata = this.objectReader.readValue(inputStream);
-            }
-            return metadata;
-
-        } catch (IOException e) {
-            throw new IOException("Cannot obtain authorization metadata from " 
+ metadataUrl.toString(), e);
-        }
+        this.httpClient = httpClient;
     }
 
     /**
      * Gets a well-known metadata URL for the given OAuth issuer URL.
+     *
      * @param issuerUrl The authorization server's issuer identifier
      * @return a resolver
      */
-    public static DefaultMetadataResolver fromIssuerUrl(URL issuerUrl) {
-        return new DefaultMetadataResolver(getWellKnownMetadataUrl(issuerUrl));
+    public static DefaultMetadataResolver fromIssuerUrl(URL issuerUrl, 
AsyncHttpClient httpClient) {
+        return new DefaultMetadataResolver(getWellKnownMetadataUrl(issuerUrl), 
httpClient);
     }
 
     /**
      * Gets a well-known metadata URL for the given OAuth issuer URL.
-     * @see <a 
href="https://tools.ietf.org/id/draft-ietf-oauth-discovery-08.html#ASConfig";>
-     *     OAuth Discovery: Obtaining Authorization Server Metadata</a>
+     *
      * @param issuerUrl The authorization server's issuer identifier
      * @return a URL
+     * @see <a 
href="https://tools.ietf.org/id/draft-ietf-oauth-discovery-08.html#ASConfig";>
+     * OAuth Discovery: Obtaining Authorization Server Metadata</a>
      */
     public static URL getWellKnownMetadataUrl(URL issuerUrl) {
         try {
@@ -109,4 +71,33 @@ public class DefaultMetadataResolver implements 
MetadataResolver {
             throw new IllegalArgumentException(e);
         }
     }
+
+    /**
+     * Resolves the authorization metadata.
+     *
+     * @return metadata
+     * @throws IOException if the metadata could not be resolved.
+     */
+    public Metadata resolve() throws IOException {
+
+        try {
+            Response response = httpClient.prepareGet(metadataUrl.toString())
+                    .addHeader(HttpHeaderNames.ACCEPT, 
HttpHeaderValues.APPLICATION_JSON)
+                    .execute()
+                    .toCompletableFuture()
+                    .get();
+
+            Metadata metadata;
+            try (InputStream inputStream = response.getResponseBodyAsStream()) 
{
+                metadata = this.objectReader.readValue(inputStream);
+            }
+            return metadata;
+
+        } catch (IOException | InterruptedException | ExecutionException e) {
+            if (e instanceof InterruptedException) {
+                Thread.currentThread().interrupt();
+            }
+            throw new IOException("Cannot obtain authorization metadata from " 
+ metadataUrl, e);
+        }
+    }
 }
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/oauth2/protocol/TokenClient.java
 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/oauth2/protocol/TokenClient.java
index f4e4c770e67..cb4c2a551d0 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/oauth2/protocol/TokenClient.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/oauth2/protocol/TokenClient.java
@@ -27,12 +27,8 @@ import java.util.TreeMap;
 import java.util.concurrent.ExecutionException;
 import java.util.stream.Collectors;
 import org.apache.commons.lang3.StringUtils;
-import org.apache.pulsar.PulsarVersion;
 import org.apache.pulsar.common.util.ObjectMapperFactory;
 import org.asynchttpclient.AsyncHttpClient;
-import org.asynchttpclient.AsyncHttpClientConfig;
-import org.asynchttpclient.DefaultAsyncHttpClient;
-import org.asynchttpclient.DefaultAsyncHttpClientConfig;
 import org.asynchttpclient.Response;
 
 /**
@@ -40,30 +36,11 @@ import org.asynchttpclient.Response;
  */
 public class TokenClient implements ClientCredentialsExchanger {
 
-    protected static final int DEFAULT_CONNECT_TIMEOUT_IN_SECONDS = 10;
-    protected static final int DEFAULT_READ_TIMEOUT_IN_SECONDS = 30;
-
     private final URL tokenUrl;
     private final AsyncHttpClient httpClient;
 
-    public TokenClient(URL tokenUrl) {
-        this(tokenUrl, null);
-    }
-
-    TokenClient(URL tokenUrl, AsyncHttpClient httpClient) {
-        if (httpClient == null) {
-            DefaultAsyncHttpClientConfig.Builder confBuilder = new 
DefaultAsyncHttpClientConfig.Builder();
-            confBuilder.setCookieStore(null);
-            confBuilder.setUseProxyProperties(true);
-            confBuilder.setFollowRedirect(true);
-            confBuilder.setConnectTimeout(DEFAULT_CONNECT_TIMEOUT_IN_SECONDS * 
1000);
-            confBuilder.setReadTimeout(DEFAULT_READ_TIMEOUT_IN_SECONDS * 1000);
-            confBuilder.setUserAgent(String.format("Pulsar-Java-v%s", 
PulsarVersion.getVersion()));
-            AsyncHttpClientConfig config = confBuilder.build();
-            this.httpClient = new DefaultAsyncHttpClient(config);
-        } else {
-            this.httpClient = httpClient;
-        }
+    public TokenClient(URL tokenUrl, AsyncHttpClient httpClient) {
+        this.httpClient = httpClient;
         this.tokenUrl = tokenUrl;
     }
 
@@ -74,6 +51,7 @@ public class TokenClient implements 
ClientCredentialsExchanger {
 
     /**
      * Constructing http request parameters.
+     *
      * @param req object with relevant request parameters
      * @return Generate the final request body from a map.
      */
@@ -97,6 +75,7 @@ public class TokenClient implements 
ClientCredentialsExchanger {
 
     /**
      * Performs a token exchange using client credentials.
+     *
      * @param req the client credentials request details.
      * @return a token result
      * @throws TokenExchangeException
@@ -115,24 +94,26 @@ public class TokenClient implements 
ClientCredentialsExchanger {
                     .get();
 
             switch (res.getStatusCode()) {
-            case 200:
-                return 
ObjectMapperFactory.getMapper().reader().readValue(res.getResponseBodyAsBytes(),
-                        TokenResult.class);
-
-            case 400: // Bad request
-            case 401: // Unauthorized
-                throw new TokenExchangeException(
-                        
ObjectMapperFactory.getMapper().reader().readValue(res.getResponseBodyAsBytes(),
-                                TokenError.class));
-
-            default:
-                throw new IOException(
-                        "Failed to perform HTTP request. res: " + 
res.getStatusCode() + " " + res.getStatusText());
+                case 200:
+                    return 
ObjectMapperFactory.getMapper().reader().readValue(res.getResponseBodyAsBytes(),
+                            TokenResult.class);
+
+                case 400: // Bad request
+                case 401: // Unauthorized
+                    throw new TokenExchangeException(
+                            
ObjectMapperFactory.getMapper().reader().readValue(res.getResponseBodyAsBytes(),
+                                    TokenError.class));
+
+                default:
+                    throw new IOException(
+                            "Failed to perform HTTP request. res: " + 
res.getStatusCode() + " " + res.getStatusText());
             }
 
 
-
         } catch (InterruptedException | ExecutionException e1) {
+            if (e1 instanceof InterruptedException) {
+                Thread.currentThread().interrupt();
+            }
             throw new IOException(e1);
         }
     }
diff --git 
a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/auth/oauth2/AuthenticationFactoryOAuth2Test.java
 
b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/auth/oauth2/AuthenticationFactoryOAuth2Test.java
new file mode 100644
index 00000000000..602aafa7b6c
--- /dev/null
+++ 
b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/auth/oauth2/AuthenticationFactoryOAuth2Test.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.impl.auth.oauth2;
+
+import static org.testng.Assert.assertTrue;
+import java.io.IOException;
+import java.net.URL;
+import java.time.Duration;
+import org.apache.pulsar.client.api.Authentication;
+import org.testng.annotations.Test;
+
+public class AuthenticationFactoryOAuth2Test {
+
+    @Test
+    public void testBuilder() throws IOException {
+        URL issuerUrl = new URL("http://localhost";);
+        URL credentialsUrl = new URL("http://localhost";);
+        String audience = "audience";
+        String scope = "scope";
+        Duration connectTimeout = Duration.parse("PT11S");
+        Duration readTimeout = Duration.ofSeconds(31);
+        String trustCertsFilePath = null;
+        try (Authentication authentication =
+                     
AuthenticationFactoryOAuth2.clientCredentialsBuilder().issuerUrl(issuerUrl)
+                             
.credentialsUrl(credentialsUrl).audience(audience).scope(scope)
+                             
.connectTimeout(connectTimeout).readTimeout(readTimeout)
+                             .trustCertsFilePath(trustCertsFilePath).build()) {
+            assertTrue(authentication instanceof AuthenticationOAuth2);
+        }
+    }
+
+    @Test
+    public void testClientCredentials() throws IOException {
+        URL issuerUrl = new URL("http://localhost";);
+        URL credentialsUrl = new URL("http://localhost";);
+        String audience = "audience";
+        try (Authentication authentication =
+                     AuthenticationFactoryOAuth2.clientCredentials(issuerUrl, 
credentialsUrl, audience)) {
+            assertTrue(authentication instanceof AuthenticationOAuth2);
+        }
+    }
+
+}

Reply via email to