This is an automated email from the ASF dual-hosted git repository.
lhotari pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new b789d825feb [feat][client] oauth2 trustcerts file and timeouts (#24944)
b789d825feb is described below
commit b789d825feb3975092b2da59dea10e512d5d0b42
Author: gulecroc <[email protected]>
AuthorDate: Tue Nov 11 16:20:42 2025 +0100
[feat][client] oauth2 trustcerts file and timeouts (#24944)
---
.../auth/oauth2/AuthenticationFactoryOAuth2.java | 154 ++++++++++++++++++---
.../impl/auth/oauth2/ClientCredentialsFlow.java | 110 ++++++++-------
.../pulsar/client/impl/auth/oauth2/FlowBase.java | 76 +++++++++-
.../oauth2/protocol/DefaultMetadataResolver.java | 95 ++++++-------
.../impl/auth/oauth2/protocol/TokenClient.java | 59 +++-----
.../oauth2/AuthenticationFactoryOAuth2Test.java | 59 ++++++++
6 files changed, 395 insertions(+), 158 deletions(-)
diff --git
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/oauth2/AuthenticationFactoryOAuth2.java
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/oauth2/AuthenticationFactoryOAuth2.java
index c9abb3a3c01..033d5308a2a 100644
---
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/oauth2/AuthenticationFactoryOAuth2.java
+++
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/oauth2/AuthenticationFactoryOAuth2.java
@@ -20,6 +20,7 @@ package org.apache.pulsar.client.impl.auth.oauth2;
import java.net.URL;
import java.time.Clock;
+import java.time.Duration;
import org.apache.pulsar.client.api.Authentication;
/**
@@ -31,9 +32,9 @@ public final class AuthenticationFactoryOAuth2 {
/**
* Authenticate with client credentials.
*
- * @param issuerUrl the issuer URL
+ * @param issuerUrl the issuer URL
* @param credentialsUrl the credentials URL
- * @param audience An optional field. The audience identifier used by some
Identity Providers, like Auth0.
+ * @param audience An optional field. The audience identifier used
by some Identity Providers, like Auth0.
* @return an Authentication object
*/
public static Authentication clientCredentials(URL issuerUrl, URL
credentialsUrl, String audience) {
@@ -43,23 +44,144 @@ public final class AuthenticationFactoryOAuth2 {
/**
* Authenticate with client credentials.
*
- * @param issuerUrl the issuer URL
+ * @param issuerUrl the issuer URL
* @param credentialsUrl the credentials URL
- * @param audience An optional field. The audience identifier used by some
Identity Providers, like Auth0.
- * @param scope An optional field. The value of the scope parameter is
expressed as a list of space-delimited,
- * case-sensitive strings. The strings are defined by the
authorization server.
- * If the value contains multiple space-delimited strings,
their order does not matter,
- * and each string adds an additional access range to the
requested scope.
- * From here:
https://datatracker.ietf.org/doc/html/rfc6749#section-4.4.2
+ * @param audience An optional field. The audience identifier used
by some Identity Providers, like Auth0.
+ * @param scope An optional field. The value of the scope
parameter is expressed as a list of
+ * space-delimited,
+ * case-sensitive strings. The strings are defined
by the authorization server.
+ * If the value contains multiple space-delimited
strings, their order does not matter,
+ * and each string adds an additional access range
to the requested scope.
+ * From here:
https://datatracker.ietf.org/doc/html/rfc6749#section-4.4.2
* @return an Authentication object
*/
public static Authentication clientCredentials(URL issuerUrl, URL
credentialsUrl, String audience, String scope) {
- ClientCredentialsFlow flow = ClientCredentialsFlow.builder()
- .issuerUrl(issuerUrl)
- .privateKey(credentialsUrl.toExternalForm())
- .audience(audience)
- .scope(scope)
- .build();
- return new AuthenticationOAuth2(flow, Clock.systemDefaultZone());
+ return
clientCredentialsBuilder().issuerUrl(issuerUrl).credentialsUrl(credentialsUrl).audience(audience)
+ .scope(scope).build();
}
+
+ /**
+ * A builder to create an authentication with client credentials.
+ *
+ * @return the builder
+ */
+ public static ClientCredentialsBuilder clientCredentialsBuilder() {
+ return new ClientCredentialsBuilder();
+ }
+
+ public static class ClientCredentialsBuilder {
+
+ private URL issuerUrl;
+ private URL credentialsUrl;
+ private String audience;
+ private String scope;
+ private Duration connectTimeout;
+ private Duration readTimeout;
+ private String trustCertsFilePath;
+
+ private ClientCredentialsBuilder() {
+ }
+
+ /**
+ * Required issuer URL.
+ *
+ * @param issuerUrl the issuer URL
+ * @return the builder
+ */
+ public ClientCredentialsBuilder issuerUrl(URL issuerUrl) {
+ this.issuerUrl = issuerUrl;
+ return this;
+ }
+
+ /**
+ * Required credentials URL.
+ *
+ * @param credentialsUrl the credentials URL
+ * @return the builder
+ */
+ public ClientCredentialsBuilder credentialsUrl(URL credentialsUrl) {
+ this.credentialsUrl = credentialsUrl;
+ return this;
+ }
+
+ /**
+ * Optional audience identifier used by some Identity Providers, like
Auth0.
+ *
+ * @param audience the audiance
+ * @return the builder
+ */
+ public ClientCredentialsBuilder audience(String audience) {
+ this.audience = audience;
+ return this;
+ }
+
+ /**
+ * Optional scope expressed as a list of space-delimited,
case-sensitive strings.
+ * The strings are defined by the authorization server.
+ * If the value contains multiple space-delimited strings, their order
does not matter,
+ * and each string adds an additional access range to the requested
scope.
+ * From here:
https://datatracker.ietf.org/doc/html/rfc6749#section-4.4.2
+ *
+ * @param scope the scope
+ * @return the builder
+ */
+ public ClientCredentialsBuilder scope(String scope) {
+ this.scope = scope;
+ return this;
+ }
+
+ /**
+ * Optional HTTP connection timeout.
+ *
+ * @param connectTimeout the connect timeout
+ * @return the builder
+ */
+ public ClientCredentialsBuilder connectTimeout(Duration
connectTimeout) {
+ this.connectTimeout = connectTimeout;
+ return this;
+ }
+
+ /**
+ * Optional HTTP read timeout.
+ *
+ * @param readTimeout the read timeout
+ * @return the builder
+ */
+ public ClientCredentialsBuilder readTimeout(Duration readTimeout) {
+ this.readTimeout = readTimeout;
+ return this;
+ }
+
+ /**
+ * Optional path to the file containing the trusted certificate(s) of
the token issuer.
+ *
+ * @param trustCertsFilePath the path to the file containing the
trusted certificate(s)
+ * @return the builder
+ */
+ public ClientCredentialsBuilder trustCertsFilePath(String
trustCertsFilePath) {
+ this.trustCertsFilePath = trustCertsFilePath;
+ return this;
+ }
+
+ /**
+ * Authenticate with client credentials.
+ *
+ * @return an Authentication object
+ */
+ public Authentication build() {
+ ClientCredentialsFlow flow = ClientCredentialsFlow.builder()
+ .issuerUrl(issuerUrl)
+ .privateKey(credentialsUrl == null ? null :
credentialsUrl.toExternalForm())
+ .audience(audience)
+ .scope(scope)
+ .connectTimeout(connectTimeout)
+ .readTimeout(readTimeout)
+ .trustCertsFilePath(trustCertsFilePath)
+ .build();
+ return new AuthenticationOAuth2(flow, Clock.systemDefaultZone());
+ }
+
+ }
+
+
}
diff --git
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/oauth2/ClientCredentialsFlow.java
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/oauth2/ClientCredentialsFlow.java
index ef10f1afdb6..7f64c0b18ac 100644
---
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/oauth2/ClientCredentialsFlow.java
+++
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/oauth2/ClientCredentialsFlow.java
@@ -26,6 +26,7 @@ import java.net.URISyntaxException;
import java.net.URL;
import java.net.URLConnection;
import java.nio.charset.StandardCharsets;
+import java.time.Duration;
import java.util.Map;
import lombok.Builder;
import lombok.extern.slf4j.Slf4j;
@@ -60,62 +61,17 @@ class ClientCredentialsFlow extends FlowBase {
private boolean initialized = false;
@Builder
- public ClientCredentialsFlow(URL issuerUrl, String audience, String
privateKey, String scope) {
- super(issuerUrl);
+ public ClientCredentialsFlow(URL issuerUrl, String audience, String
privateKey, String scope,
+ Duration connectTimeout, Duration
readTimeout, String trustCertsFilePath) {
+ super(issuerUrl, connectTimeout, readTimeout, trustCertsFilePath);
this.audience = audience;
this.privateKey = privateKey;
this.scope = scope;
}
- @Override
- public void initialize() throws PulsarClientException {
- super.initialize();
- assert this.metadata != null;
-
- URL tokenUrl = this.metadata.getTokenEndpoint();
- this.exchanger = new TokenClient(tokenUrl);
- initialized = true;
- }
-
- public TokenResult authenticate() throws PulsarClientException {
- // read the private key from storage
- KeyFile keyFile;
- try {
- keyFile = loadPrivateKey(this.privateKey);
- } catch (IOException e) {
- throw new PulsarClientException.AuthenticationException("Unable to
read private key: " + e.getMessage());
- }
-
- // request an access token using client credentials
- ClientCredentialsExchangeRequest req =
ClientCredentialsExchangeRequest.builder()
- .clientId(keyFile.getClientId())
- .clientSecret(keyFile.getClientSecret())
- .audience(this.audience)
- .scope(this.scope)
- .build();
- TokenResult tr;
- if (!initialized) {
- initialize();
- }
- try {
- tr = this.exchanger.exchangeClientCredentials(req);
- } catch (TokenExchangeException | IOException e) {
- throw new PulsarClientException.AuthenticationException("Unable to
obtain an access token: "
- +
e.getMessage());
- }
-
- return tr;
- }
-
- @Override
- public void close() throws Exception {
- if (exchanger != null) {
- exchanger.close();
- }
- }
-
/**
* Constructs a {@link ClientCredentialsFlow} from configuration
parameters.
+ *
* @param params
* @return
*/
@@ -125,16 +81,24 @@ class ClientCredentialsFlow extends FlowBase {
// These are optional parameters, so we only perform a get
String scope = params.get(CONFIG_PARAM_SCOPE);
String audience = params.get(CONFIG_PARAM_AUDIENCE);
+ Duration connectTimeout = parseParameterDuration(params,
CONFIG_PARAM_CONNECT_TIMEOUT);
+ Duration readTimeout = parseParameterDuration(params,
CONFIG_PARAM_READ_TIMEOUT);
+ String trustCertsFilePath =
params.get(CONFIG_PARAM_TRUST_CERTS_FILE_PATH);
+
return ClientCredentialsFlow.builder()
.issuerUrl(issuerUrl)
.audience(audience)
.privateKey(privateKeyUrl)
.scope(scope)
+ .connectTimeout(connectTimeout)
+ .readTimeout(readTimeout)
+ .trustCertsFilePath(trustCertsFilePath)
.build();
}
/**
* Loads the private key from the given URL.
+ *
* @param privateKeyURL
* @return
* @throws IOException
@@ -162,4 +126,52 @@ class ClientCredentialsFlow extends FlowBase {
throw new IOException("Invalid privateKey format", e);
}
}
+
+ @Override
+ public void initialize() throws PulsarClientException {
+ super.initialize();
+ assert this.metadata != null;
+
+ URL tokenUrl = this.metadata.getTokenEndpoint();
+ this.exchanger = new TokenClient(tokenUrl, httpClient);
+ initialized = true;
+ }
+
+ public TokenResult authenticate() throws PulsarClientException {
+ // read the private key from storage
+ KeyFile keyFile;
+ try {
+ keyFile = loadPrivateKey(this.privateKey);
+ } catch (IOException e) {
+ throw new PulsarClientException.AuthenticationException("Unable to
read private key: " + e.getMessage());
+ }
+
+ // request an access token using client credentials
+ ClientCredentialsExchangeRequest req =
ClientCredentialsExchangeRequest.builder()
+ .clientId(keyFile.getClientId())
+ .clientSecret(keyFile.getClientSecret())
+ .audience(this.audience)
+ .scope(this.scope)
+ .build();
+ TokenResult tr;
+ if (!initialized) {
+ initialize();
+ }
+ try {
+ tr = this.exchanger.exchangeClientCredentials(req);
+ } catch (TokenExchangeException | IOException e) {
+ throw new PulsarClientException.AuthenticationException("Unable to
obtain an access token: "
+ + e.getMessage());
+ }
+
+ return tr;
+ }
+
+ @Override
+ public void close() throws Exception {
+ super.close();
+ if (exchanger != null) {
+ exchanger.close();
+ }
+ }
}
diff --git
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/oauth2/FlowBase.java
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/oauth2/FlowBase.java
index 125a8800862..6cc9f8e41b5 100644
---
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/oauth2/FlowBase.java
+++
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/oauth2/FlowBase.java
@@ -18,16 +18,25 @@
*/
package org.apache.pulsar.client.impl.auth.oauth2;
+import io.netty.handler.ssl.SslContextBuilder;
+import java.io.File;
import java.io.IOException;
import java.net.MalformedURLException;
import java.net.URL;
+import java.time.Duration;
+import java.time.format.DateTimeParseException;
import java.util.Map;
+import javax.net.ssl.SSLException;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
+import org.apache.pulsar.PulsarVersion;
import org.apache.pulsar.client.api.PulsarClientException;
import
org.apache.pulsar.client.impl.auth.oauth2.protocol.DefaultMetadataResolver;
import org.apache.pulsar.client.impl.auth.oauth2.protocol.Metadata;
import org.apache.pulsar.client.impl.auth.oauth2.protocol.MetadataResolver;
+import org.asynchttpclient.AsyncHttpClient;
+import org.asynchttpclient.DefaultAsyncHttpClient;
+import org.asynchttpclient.DefaultAsyncHttpClientConfig;
/**
* An abstract OAuth 2.0 authorization flow.
@@ -35,14 +44,60 @@ import
org.apache.pulsar.client.impl.auth.oauth2.protocol.MetadataResolver;
@Slf4j
abstract class FlowBase implements Flow {
+ public static final String CONFIG_PARAM_CONNECT_TIMEOUT = "connectTimeout";
+ public static final String CONFIG_PARAM_READ_TIMEOUT = "readTimeout";
+ public static final String CONFIG_PARAM_TRUST_CERTS_FILE_PATH =
"trustCertsFilePath";
+
+ protected static final Duration DEFAULT_CONNECT_TIMEOUT =
Duration.ofSeconds(10);
+ protected static final Duration DEFAULT_READ_TIMEOUT =
Duration.ofSeconds(30);
+
private static final long serialVersionUID = 1L;
protected final URL issuerUrl;
+ protected final AsyncHttpClient httpClient;
protected transient Metadata metadata;
- protected FlowBase(URL issuerUrl) {
+ protected FlowBase(URL issuerUrl, Duration connectTimeout, Duration
readTimeout, String trustCertsFilePath) {
this.issuerUrl = issuerUrl;
+ this.httpClient = defaultHttpClient(readTimeout, connectTimeout,
trustCertsFilePath);
+ }
+
+ private AsyncHttpClient defaultHttpClient(Duration readTimeout, Duration
connectTimeout,
+ String trustCertsFilePath) {
+ DefaultAsyncHttpClientConfig.Builder confBuilder = new
DefaultAsyncHttpClientConfig.Builder();
+ confBuilder.setCookieStore(null);
+ confBuilder.setUseProxyProperties(true);
+ confBuilder.setFollowRedirect(true);
+ confBuilder.setConnectTimeout(
+ getParameterDurationToMillis(CONFIG_PARAM_CONNECT_TIMEOUT,
connectTimeout,
+ DEFAULT_CONNECT_TIMEOUT));
+ confBuilder.setReadTimeout(
+ getParameterDurationToMillis(CONFIG_PARAM_READ_TIMEOUT,
readTimeout, DEFAULT_READ_TIMEOUT));
+ confBuilder.setUserAgent(String.format("Pulsar-Java-v%s",
PulsarVersion.getVersion()));
+ if (StringUtils.isNotBlank(trustCertsFilePath)) {
+ try {
+ confBuilder.setSslContext(SslContextBuilder.forClient()
+ .trustManager(new File(trustCertsFilePath))
+ .build());
+ } catch (SSLException e) {
+ log.error("Could not set " +
CONFIG_PARAM_TRUST_CERTS_FILE_PATH, e);
+ }
+ }
+ return new DefaultAsyncHttpClient(confBuilder.build());
+ }
+
+ private int getParameterDurationToMillis(String name, Duration value,
Duration defaultValue) {
+ Duration duration;
+ if (value == null) {
+ log.info("Configuration for [{}] is using the default value:
[{}]", name, defaultValue);
+ duration = defaultValue;
+ } else {
+ log.info("Configuration for [{}] is: [{}]", name, value);
+ duration = value;
+ }
+
+ return (int) duration.toMillis();
}
public void initialize() throws PulsarClientException {
@@ -55,7 +110,7 @@ abstract class FlowBase implements Flow {
}
protected MetadataResolver createMetadataResolver() {
- return DefaultMetadataResolver.fromIssuerUrl(issuerUrl);
+ return DefaultMetadataResolver.fromIssuerUrl(issuerUrl, httpClient);
}
static String parseParameterString(Map<String, String> params, String
name) {
@@ -77,4 +132,21 @@ abstract class FlowBase implements Flow {
throw new IllegalArgumentException("Malformed configuration
parameter: " + name);
}
}
+
+ static Duration parseParameterDuration(Map<String, String> params, String
name) {
+ String value = params.get(name);
+ if (StringUtils.isNotBlank(value)) {
+ try {
+ return Duration.parse(value);
+ } catch (DateTimeParseException e) {
+ throw new IllegalArgumentException("Malformed configuration
parameter: " + name, e);
+ }
+ }
+ return null;
+ }
+
+ @Override
+ public void close() throws Exception {
+ httpClient.close();
+ }
}
diff --git
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/oauth2/protocol/DefaultMetadataResolver.java
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/oauth2/protocol/DefaultMetadataResolver.java
index be636145cb2..19d0c1acadd 100644
---
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/oauth2/protocol/DefaultMetadataResolver.java
+++
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/oauth2/protocol/DefaultMetadataResolver.java
@@ -19,88 +19,50 @@
package org.apache.pulsar.client.impl.auth.oauth2.protocol;
import com.fasterxml.jackson.databind.ObjectReader;
+import io.netty.handler.codec.http.HttpHeaderNames;
+import io.netty.handler.codec.http.HttpHeaderValues;
import java.io.IOException;
import java.io.InputStream;
import java.net.MalformedURLException;
import java.net.URI;
import java.net.URL;
-import java.net.URLConnection;
-import java.time.Duration;
+import java.util.concurrent.ExecutionException;
import org.apache.pulsar.common.util.ObjectMapperFactory;
+import org.asynchttpclient.AsyncHttpClient;
+import org.asynchttpclient.Response;
/**
* Resolves OAuth 2.0 authorization server metadata as described in RFC 8414.
*/
public class DefaultMetadataResolver implements MetadataResolver {
- protected static final int DEFAULT_CONNECT_TIMEOUT_IN_SECONDS = 10;
- protected static final int DEFAULT_READ_TIMEOUT_IN_SECONDS = 30;
-
private final URL metadataUrl;
private final ObjectReader objectReader;
- private Duration connectTimeout;
- private Duration readTimeout;
+ private final AsyncHttpClient httpClient;
- public DefaultMetadataResolver(URL metadataUrl) {
+ public DefaultMetadataResolver(URL metadataUrl, AsyncHttpClient
httpClient) {
this.metadataUrl = metadataUrl;
this.objectReader =
ObjectMapperFactory.getMapper().reader().forType(Metadata.class);
- // set a default timeout to ensure that this doesn't block
- this.connectTimeout =
Duration.ofSeconds(DEFAULT_CONNECT_TIMEOUT_IN_SECONDS);
- this.readTimeout = Duration.ofSeconds(DEFAULT_READ_TIMEOUT_IN_SECONDS);
- }
-
- public DefaultMetadataResolver withConnectTimeout(Duration connectTimeout)
{
- this.connectTimeout = connectTimeout;
- return this;
- }
-
- public DefaultMetadataResolver withReadTimeout(Duration readTimeout) {
- this.readTimeout = readTimeout;
- return this;
- }
-
- /**
- * Resolves the authorization metadata.
- * @return metadata
- * @throws IOException if the metadata could not be resolved.
- */
- public Metadata resolve() throws IOException {
- try {
- URLConnection c = this.metadataUrl.openConnection();
- if (connectTimeout != null) {
- c.setConnectTimeout((int) connectTimeout.toMillis());
- }
- if (readTimeout != null) {
- c.setReadTimeout((int) readTimeout.toMillis());
- }
- c.setRequestProperty("Accept", "application/json");
-
- Metadata metadata;
- try (InputStream inputStream = c.getInputStream()) {
- metadata = this.objectReader.readValue(inputStream);
- }
- return metadata;
-
- } catch (IOException e) {
- throw new IOException("Cannot obtain authorization metadata from "
+ metadataUrl.toString(), e);
- }
+ this.httpClient = httpClient;
}
/**
* Gets a well-known metadata URL for the given OAuth issuer URL.
+ *
* @param issuerUrl The authorization server's issuer identifier
* @return a resolver
*/
- public static DefaultMetadataResolver fromIssuerUrl(URL issuerUrl) {
- return new DefaultMetadataResolver(getWellKnownMetadataUrl(issuerUrl));
+ public static DefaultMetadataResolver fromIssuerUrl(URL issuerUrl,
AsyncHttpClient httpClient) {
+ return new DefaultMetadataResolver(getWellKnownMetadataUrl(issuerUrl),
httpClient);
}
/**
* Gets a well-known metadata URL for the given OAuth issuer URL.
- * @see <a
href="https://tools.ietf.org/id/draft-ietf-oauth-discovery-08.html#ASConfig">
- * OAuth Discovery: Obtaining Authorization Server Metadata</a>
+ *
* @param issuerUrl The authorization server's issuer identifier
* @return a URL
+ * @see <a
href="https://tools.ietf.org/id/draft-ietf-oauth-discovery-08.html#ASConfig">
+ * OAuth Discovery: Obtaining Authorization Server Metadata</a>
*/
public static URL getWellKnownMetadataUrl(URL issuerUrl) {
try {
@@ -109,4 +71,33 @@ public class DefaultMetadataResolver implements
MetadataResolver {
throw new IllegalArgumentException(e);
}
}
+
+ /**
+ * Resolves the authorization metadata.
+ *
+ * @return metadata
+ * @throws IOException if the metadata could not be resolved.
+ */
+ public Metadata resolve() throws IOException {
+
+ try {
+ Response response = httpClient.prepareGet(metadataUrl.toString())
+ .addHeader(HttpHeaderNames.ACCEPT,
HttpHeaderValues.APPLICATION_JSON)
+ .execute()
+ .toCompletableFuture()
+ .get();
+
+ Metadata metadata;
+ try (InputStream inputStream = response.getResponseBodyAsStream())
{
+ metadata = this.objectReader.readValue(inputStream);
+ }
+ return metadata;
+
+ } catch (IOException | InterruptedException | ExecutionException e) {
+ if (e instanceof InterruptedException) {
+ Thread.currentThread().interrupt();
+ }
+ throw new IOException("Cannot obtain authorization metadata from "
+ metadataUrl, e);
+ }
+ }
}
diff --git
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/oauth2/protocol/TokenClient.java
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/oauth2/protocol/TokenClient.java
index f4e4c770e67..cb4c2a551d0 100644
---
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/oauth2/protocol/TokenClient.java
+++
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/oauth2/protocol/TokenClient.java
@@ -27,12 +27,8 @@ import java.util.TreeMap;
import java.util.concurrent.ExecutionException;
import java.util.stream.Collectors;
import org.apache.commons.lang3.StringUtils;
-import org.apache.pulsar.PulsarVersion;
import org.apache.pulsar.common.util.ObjectMapperFactory;
import org.asynchttpclient.AsyncHttpClient;
-import org.asynchttpclient.AsyncHttpClientConfig;
-import org.asynchttpclient.DefaultAsyncHttpClient;
-import org.asynchttpclient.DefaultAsyncHttpClientConfig;
import org.asynchttpclient.Response;
/**
@@ -40,30 +36,11 @@ import org.asynchttpclient.Response;
*/
public class TokenClient implements ClientCredentialsExchanger {
- protected static final int DEFAULT_CONNECT_TIMEOUT_IN_SECONDS = 10;
- protected static final int DEFAULT_READ_TIMEOUT_IN_SECONDS = 30;
-
private final URL tokenUrl;
private final AsyncHttpClient httpClient;
- public TokenClient(URL tokenUrl) {
- this(tokenUrl, null);
- }
-
- TokenClient(URL tokenUrl, AsyncHttpClient httpClient) {
- if (httpClient == null) {
- DefaultAsyncHttpClientConfig.Builder confBuilder = new
DefaultAsyncHttpClientConfig.Builder();
- confBuilder.setCookieStore(null);
- confBuilder.setUseProxyProperties(true);
- confBuilder.setFollowRedirect(true);
- confBuilder.setConnectTimeout(DEFAULT_CONNECT_TIMEOUT_IN_SECONDS *
1000);
- confBuilder.setReadTimeout(DEFAULT_READ_TIMEOUT_IN_SECONDS * 1000);
- confBuilder.setUserAgent(String.format("Pulsar-Java-v%s",
PulsarVersion.getVersion()));
- AsyncHttpClientConfig config = confBuilder.build();
- this.httpClient = new DefaultAsyncHttpClient(config);
- } else {
- this.httpClient = httpClient;
- }
+ public TokenClient(URL tokenUrl, AsyncHttpClient httpClient) {
+ this.httpClient = httpClient;
this.tokenUrl = tokenUrl;
}
@@ -74,6 +51,7 @@ public class TokenClient implements
ClientCredentialsExchanger {
/**
* Constructing http request parameters.
+ *
* @param req object with relevant request parameters
* @return Generate the final request body from a map.
*/
@@ -97,6 +75,7 @@ public class TokenClient implements
ClientCredentialsExchanger {
/**
* Performs a token exchange using client credentials.
+ *
* @param req the client credentials request details.
* @return a token result
* @throws TokenExchangeException
@@ -115,24 +94,26 @@ public class TokenClient implements
ClientCredentialsExchanger {
.get();
switch (res.getStatusCode()) {
- case 200:
- return
ObjectMapperFactory.getMapper().reader().readValue(res.getResponseBodyAsBytes(),
- TokenResult.class);
-
- case 400: // Bad request
- case 401: // Unauthorized
- throw new TokenExchangeException(
-
ObjectMapperFactory.getMapper().reader().readValue(res.getResponseBodyAsBytes(),
- TokenError.class));
-
- default:
- throw new IOException(
- "Failed to perform HTTP request. res: " +
res.getStatusCode() + " " + res.getStatusText());
+ case 200:
+ return
ObjectMapperFactory.getMapper().reader().readValue(res.getResponseBodyAsBytes(),
+ TokenResult.class);
+
+ case 400: // Bad request
+ case 401: // Unauthorized
+ throw new TokenExchangeException(
+
ObjectMapperFactory.getMapper().reader().readValue(res.getResponseBodyAsBytes(),
+ TokenError.class));
+
+ default:
+ throw new IOException(
+ "Failed to perform HTTP request. res: " +
res.getStatusCode() + " " + res.getStatusText());
}
-
} catch (InterruptedException | ExecutionException e1) {
+ if (e1 instanceof InterruptedException) {
+ Thread.currentThread().interrupt();
+ }
throw new IOException(e1);
}
}
diff --git
a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/auth/oauth2/AuthenticationFactoryOAuth2Test.java
b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/auth/oauth2/AuthenticationFactoryOAuth2Test.java
new file mode 100644
index 00000000000..602aafa7b6c
--- /dev/null
+++
b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/auth/oauth2/AuthenticationFactoryOAuth2Test.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.impl.auth.oauth2;
+
+import static org.testng.Assert.assertTrue;
+import java.io.IOException;
+import java.net.URL;
+import java.time.Duration;
+import org.apache.pulsar.client.api.Authentication;
+import org.testng.annotations.Test;
+
+public class AuthenticationFactoryOAuth2Test {
+
+ @Test
+ public void testBuilder() throws IOException {
+ URL issuerUrl = new URL("http://localhost");
+ URL credentialsUrl = new URL("http://localhost");
+ String audience = "audience";
+ String scope = "scope";
+ Duration connectTimeout = Duration.parse("PT11S");
+ Duration readTimeout = Duration.ofSeconds(31);
+ String trustCertsFilePath = null;
+ try (Authentication authentication =
+
AuthenticationFactoryOAuth2.clientCredentialsBuilder().issuerUrl(issuerUrl)
+
.credentialsUrl(credentialsUrl).audience(audience).scope(scope)
+
.connectTimeout(connectTimeout).readTimeout(readTimeout)
+ .trustCertsFilePath(trustCertsFilePath).build()) {
+ assertTrue(authentication instanceof AuthenticationOAuth2);
+ }
+ }
+
+ @Test
+ public void testClientCredentials() throws IOException {
+ URL issuerUrl = new URL("http://localhost");
+ URL credentialsUrl = new URL("http://localhost");
+ String audience = "audience";
+ try (Authentication authentication =
+ AuthenticationFactoryOAuth2.clientCredentials(issuerUrl,
credentialsUrl, audience)) {
+ assertTrue(authentication instanceof AuthenticationOAuth2);
+ }
+ }
+
+}