This is an automated email from the ASF dual-hosted git repository.
mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 396eceb Make OAuth2 auth plugin to use AsyncHttpClient (#7615)
396eceb is described below
commit 396ecebe3e2bdb88b1599535a8d477f7a198f062
Author: Matteo Merli <[email protected]>
AuthorDate: Fri Jul 24 14:49:16 2020 -0700
Make OAuth2 auth plugin to use AsyncHttpClient (#7615)
---
.../impl/auth/oauth2/AuthenticationOAuth2.java | 6 +-
.../impl/auth/oauth2/ClientCredentialsFlow.java | 5 +-
.../pulsar/client/impl/auth/oauth2/Flow.java | 4 +-
.../protocol/ClientCredentialsExchanger.java | 7 +-
.../impl/auth/oauth2/protocol/TokenClient.java | 143 ++++++++++-----------
5 files changed, 81 insertions(+), 84 deletions(-)
diff --git
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/oauth2/AuthenticationOAuth2.java
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/oauth2/AuthenticationOAuth2.java
index f7f41d0..be48efe 100644
---
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/oauth2/AuthenticationOAuth2.java
+++
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/oauth2/AuthenticationOAuth2.java
@@ -107,7 +107,11 @@ public class AuthenticationOAuth2 implements
Authentication, EncodedAuthenticati
@Override
public void close() throws IOException {
- flow.close();
+ try {
+ flow.close();
+ } catch (Exception e) {
+ throw new IOException(e);
+ }
}
@Data
diff --git
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/oauth2/ClientCredentialsFlow.java
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/oauth2/ClientCredentialsFlow.java
index 13bf0f5..a7b30ab 100644
---
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/oauth2/ClientCredentialsFlow.java
+++
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/oauth2/ClientCredentialsFlow.java
@@ -34,7 +34,6 @@ import java.nio.charset.StandardCharsets;
import java.util.Map;
import lombok.Builder;
import lombok.extern.slf4j.Slf4j;
-import org.apache.http.entity.ContentType;
import org.apache.pulsar.client.api.PulsarClientException;
/**
@@ -98,7 +97,7 @@ class ClientCredentialsFlow extends FlowBase {
}
@Override
- public void close() {
+ public void close() throws Exception {
exchanger.close();
}
@@ -130,7 +129,7 @@ class ClientCredentialsFlow extends FlowBase {
String protocol = urlConnection.getURL().getProtocol();
String contentType = urlConnection.getContentType();
- if ("data".equals(protocol) &&
!ContentType.APPLICATION_JSON.getMimeType().equals(contentType)) {
+ if ("data".equals(protocol) &&
!"application/json".equals(contentType)) {
throw new IllegalArgumentException(
"Unsupported media type or encoding format: " +
urlConnection.getContentType());
}
diff --git
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/oauth2/Flow.java
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/oauth2/Flow.java
index b572325..0e7b864 100644
---
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/oauth2/Flow.java
+++
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/oauth2/Flow.java
@@ -25,7 +25,7 @@ import org.apache.pulsar.client.api.PulsarClientException;
/**
* An OAuth 2.0 authorization flow.
*/
-interface Flow extends Serializable {
+interface Flow extends Serializable, AutoCloseable {
/**
* Initializes the authorization flow.
@@ -43,5 +43,5 @@ interface Flow extends Serializable {
/**
* Closes the authorization flow.
*/
- void close();
+ void close() throws Exception;
}
diff --git
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/oauth2/protocol/ClientCredentialsExchanger.java
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/oauth2/protocol/ClientCredentialsExchanger.java
index e6a956a..7d004c7 100644
---
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/oauth2/protocol/ClientCredentialsExchanger.java
+++
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/oauth2/protocol/ClientCredentialsExchanger.java
@@ -23,7 +23,7 @@ import java.io.IOException;
/**
* An interface for exchanging client credentials for an access token.
*/
-public interface ClientCredentialsExchanger {
+public interface ClientCredentialsExchanger extends AutoCloseable {
/**
* Requests an exchange of client credentials for an access token.
* @param req the request details.
@@ -33,9 +33,4 @@ public interface ClientCredentialsExchanger {
*/
TokenResult exchangeClientCredentials(ClientCredentialsExchangeRequest req)
throws TokenExchangeException, IOException;
-
- /**
- * Closes the exchanger.
- */
- void close();
}
diff --git
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/oauth2/protocol/TokenClient.java
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/oauth2/protocol/TokenClient.java
index 715579d..0718073 100644
---
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/oauth2/protocol/TokenClient.java
+++
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/oauth2/protocol/TokenClient.java
@@ -18,54 +18,49 @@
*/
package org.apache.pulsar.client.impl.auth.oauth2.protocol;
-import com.fasterxml.jackson.databind.ObjectMapper;
-import com.fasterxml.jackson.databind.ObjectReader;
import java.io.IOException;
-import java.io.InputStreamReader;
-import java.io.Reader;
-import java.net.HttpURLConnection;
+import java.io.UnsupportedEncodingException;
import java.net.URL;
-import java.nio.charset.Charset;
-import java.nio.charset.StandardCharsets;
-import java.util.ArrayList;
-import java.util.List;
-import org.apache.http.Consts;
-import org.apache.http.HttpEntity;
-import org.apache.http.NameValuePair;
-import org.apache.http.StatusLine;
-import org.apache.http.client.ClientProtocolException;
-import org.apache.http.client.HttpResponseException;
-import org.apache.http.client.entity.UrlEncodedFormEntity;
-import org.apache.http.client.methods.CloseableHttpResponse;
-import org.apache.http.client.methods.HttpPost;
-import org.apache.http.entity.ContentType;
-import org.apache.http.impl.client.CloseableHttpClient;
-import org.apache.http.impl.client.HttpClientBuilder;
-import org.apache.http.message.BasicNameValuePair;
-import org.apache.http.util.EntityUtils;
+import java.net.URLEncoder;
+import java.util.Map;
+import java.util.TreeMap;
+import java.util.concurrent.ExecutionException;
+import java.util.stream.Collectors;
+
+import org.apache.pulsar.PulsarVersion;
+import org.apache.pulsar.common.util.ObjectMapperFactory;
+import org.asynchttpclient.AsyncHttpClient;
+import org.asynchttpclient.AsyncHttpClientConfig;
+import org.asynchttpclient.DefaultAsyncHttpClient;
+import org.asynchttpclient.DefaultAsyncHttpClientConfig;
+import org.asynchttpclient.Response;
/**
* A client for an OAuth 2.0 token endpoint.
*/
-public class TokenClient implements AutoCloseable, ClientCredentialsExchanger {
-
- private static final ObjectReader resultReader;
- private static final ObjectReader errorReader;
+public class TokenClient implements ClientCredentialsExchanger {
- static {
- resultReader = new ObjectMapper().readerFor(TokenResult.class);
- errorReader = new ObjectMapper().readerFor(TokenError.class);
- }
+ protected final static int DEFAULT_CONNECT_TIMEOUT_IN_SECONDS = 10;
+ protected final static int DEFAULT_READ_TIMEOUT_IN_SECONDS = 30;
private final URL tokenUrl;
- private final CloseableHttpClient httpclient;
+ private final AsyncHttpClient httpClient;
public TokenClient(URL tokenUrl) {
this.tokenUrl = tokenUrl;
- this.httpclient =
HttpClientBuilder.create().useSystemProperties().disableCookieManagement().build();
+
+ DefaultAsyncHttpClientConfig.Builder confBuilder = new
DefaultAsyncHttpClientConfig.Builder();
+ confBuilder.setFollowRedirect(true);
+ confBuilder.setConnectTimeout(DEFAULT_CONNECT_TIMEOUT_IN_SECONDS *
1000);
+ confBuilder.setReadTimeout(DEFAULT_READ_TIMEOUT_IN_SECONDS * 1000);
+ confBuilder.setUserAgent(String.format("Pulsar-Java-v%s",
PulsarVersion.getVersion()));
+ AsyncHttpClientConfig config = confBuilder.build();
+ httpClient = new DefaultAsyncHttpClient(config);
}
- public void close() {
+ @Override
+ public void close() throws Exception {
+ httpClient.close();
}
/**
@@ -76,46 +71,50 @@ public class TokenClient implements AutoCloseable,
ClientCredentialsExchanger {
*/
public TokenResult
exchangeClientCredentials(ClientCredentialsExchangeRequest req)
throws TokenExchangeException, IOException {
- List<NameValuePair> params = new ArrayList<>(4);
- params.add(new BasicNameValuePair("grant_type", "client_credentials"));
- params.add(new BasicNameValuePair("client_id", req.getClientId()));
- params.add(new BasicNameValuePair("client_secret",
req.getClientSecret()));
- params.add(new BasicNameValuePair("audience", req.getAudience()));
- HttpPost post = new HttpPost(tokenUrl.toString());
- post.setHeader("Accept", ContentType.APPLICATION_JSON.getMimeType());
- post.setEntity(new UrlEncodedFormEntity(params, Consts.UTF_8));
-
- try (CloseableHttpResponse response = httpclient.execute(post)) {
- StatusLine status = response.getStatusLine();
- HttpEntity entity = response.getEntity();
- try {
- switch(status.getStatusCode()) {
- case HttpURLConnection.HTTP_OK:
- return readResponse(entity, resultReader);
- case HttpURLConnection.HTTP_BAD_REQUEST:
- case HttpURLConnection.HTTP_UNAUTHORIZED:
- throw new TokenExchangeException(readResponse(entity,
errorReader));
- default:
- throw new
HttpResponseException(status.getStatusCode(), status.getReasonPhrase());
- }
- } finally {
- EntityUtils.consume(entity);
+ Map<String, String> bodyMap = new TreeMap<>();
+ bodyMap.put("grant_type", "client_credentials");
+ bodyMap.put("client_id", req.getClientId());
+ bodyMap.put("client_secret", req.getClientSecret());
+ bodyMap.put("audience", req.getAudience());
+ String body = bodyMap.entrySet().stream()
+ .map(e -> {
+ try {
+ return URLEncoder.encode(e.getKey(), "UTF-8") + '=' +
URLEncoder.encode(e.getValue(), "UTF-8");
+ } catch (UnsupportedEncodingException e1) {
+ throw new RuntimeException(e1);
+ }
+ })
+ .collect(Collectors.joining("&"));
+
+ try {
+
+ Response res = httpClient.preparePost(tokenUrl.toString())
+ .setHeader("Accept", "application/json")
+ .setHeader("Content-Type",
"application/x-www-form-urlencoded")
+ .setBody(body)
+ .execute()
+ .get();
+
+ switch (res.getStatusCode()) {
+ case 200:
+ return
ObjectMapperFactory.getThreadLocal().reader().readValue(res.getResponseBodyAsBytes(),
+ TokenResult.class);
+
+ case 400: // Bad request
+ case 401: // Unauthorized
+ throw new TokenExchangeException(
+
ObjectMapperFactory.getThreadLocal().reader().readValue(res.getResponseBodyAsBytes(),
+ TokenError.class));
+
+ default:
+ throw new IOException(
+ "Failed to perform HTTP request. res: " +
res.getStatusCode() + " " + res.getStatusText());
}
- }
- }
- private static <T> T readResponse(HttpEntity entity, ObjectReader
objectReader) throws IOException {
- ContentType contentType = ContentType.getOrDefault(entity);
- if
(!ContentType.APPLICATION_JSON.getMimeType().equals(contentType.getMimeType()))
{
- throw new ClientProtocolException("Unsupported content type: " +
contentType.getMimeType());
- }
- Charset charset = contentType.getCharset();
- if (charset == null) {
- charset = StandardCharsets.UTF_8;
- }
- try (Reader reader = new InputStreamReader(entity.getContent(),
charset)) {
- @SuppressWarnings("unchecked") T obj = (T)
objectReader.readValue(reader);
- return obj;
+
+
+ } catch (InterruptedException | ExecutionException e1) {
+ throw new IOException(e1);
}
}
}