This is an automated email from the ASF dual-hosted git repository. rxl pushed a commit to branch branch-2.6 in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit 0825f86ada22cdd6429add175fed9b04410213ec Author: Matteo Merli <[email protected]> AuthorDate: Fri Jul 24 14:49:16 2020 -0700 Make OAuth2 auth plugin to use AsyncHttpClient (#7615) (cherry picked from commit 396ecebe3e2bdb88b1599535a8d477f7a198f062) --- .../impl/auth/oauth2/AuthenticationOAuth2.java | 6 +- .../impl/auth/oauth2/ClientCredentialsFlow.java | 5 +- .../pulsar/client/impl/auth/oauth2/Flow.java | 4 +- .../protocol/ClientCredentialsExchanger.java | 7 +- .../impl/auth/oauth2/protocol/TokenClient.java | 143 ++++++++++----------- 5 files changed, 81 insertions(+), 84 deletions(-) diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/oauth2/AuthenticationOAuth2.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/oauth2/AuthenticationOAuth2.java index f7f41d0..be48efe 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/oauth2/AuthenticationOAuth2.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/oauth2/AuthenticationOAuth2.java @@ -107,7 +107,11 @@ public class AuthenticationOAuth2 implements Authentication, EncodedAuthenticati @Override public void close() throws IOException { - flow.close(); + try { + flow.close(); + } catch (Exception e) { + throw new IOException(e); + } } @Data diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/oauth2/ClientCredentialsFlow.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/oauth2/ClientCredentialsFlow.java index 13bf0f5..a7b30ab 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/oauth2/ClientCredentialsFlow.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/oauth2/ClientCredentialsFlow.java @@ -34,7 +34,6 @@ import java.nio.charset.StandardCharsets; import java.util.Map; import lombok.Builder; import lombok.extern.slf4j.Slf4j; -import org.apache.http.entity.ContentType; import org.apache.pulsar.client.api.PulsarClientException; /** @@ -98,7 +97,7 @@ class ClientCredentialsFlow extends FlowBase { } @Override - public void close() { + public void close() throws Exception { exchanger.close(); } @@ -130,7 +129,7 @@ class ClientCredentialsFlow extends FlowBase { String protocol = urlConnection.getURL().getProtocol(); String contentType = urlConnection.getContentType(); - if ("data".equals(protocol) && !ContentType.APPLICATION_JSON.getMimeType().equals(contentType)) { + if ("data".equals(protocol) && !"application/json".equals(contentType)) { throw new IllegalArgumentException( "Unsupported media type or encoding format: " + urlConnection.getContentType()); } diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/oauth2/Flow.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/oauth2/Flow.java index b572325..0e7b864 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/oauth2/Flow.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/oauth2/Flow.java @@ -25,7 +25,7 @@ import org.apache.pulsar.client.api.PulsarClientException; /** * An OAuth 2.0 authorization flow. */ -interface Flow extends Serializable { +interface Flow extends Serializable, AutoCloseable { /** * Initializes the authorization flow. @@ -43,5 +43,5 @@ interface Flow extends Serializable { /** * Closes the authorization flow. */ - void close(); + void close() throws Exception; } diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/oauth2/protocol/ClientCredentialsExchanger.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/oauth2/protocol/ClientCredentialsExchanger.java index e6a956a..7d004c7 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/oauth2/protocol/ClientCredentialsExchanger.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/oauth2/protocol/ClientCredentialsExchanger.java @@ -23,7 +23,7 @@ import java.io.IOException; /** * An interface for exchanging client credentials for an access token. */ -public interface ClientCredentialsExchanger { +public interface ClientCredentialsExchanger extends AutoCloseable { /** * Requests an exchange of client credentials for an access token. * @param req the request details. @@ -33,9 +33,4 @@ public interface ClientCredentialsExchanger { */ TokenResult exchangeClientCredentials(ClientCredentialsExchangeRequest req) throws TokenExchangeException, IOException; - - /** - * Closes the exchanger. - */ - void close(); } diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/oauth2/protocol/TokenClient.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/oauth2/protocol/TokenClient.java index 715579d..0718073 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/oauth2/protocol/TokenClient.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/oauth2/protocol/TokenClient.java @@ -18,54 +18,49 @@ */ package org.apache.pulsar.client.impl.auth.oauth2.protocol; -import com.fasterxml.jackson.databind.ObjectMapper; -import com.fasterxml.jackson.databind.ObjectReader; import java.io.IOException; -import java.io.InputStreamReader; -import java.io.Reader; -import java.net.HttpURLConnection; +import java.io.UnsupportedEncodingException; import java.net.URL; -import java.nio.charset.Charset; -import java.nio.charset.StandardCharsets; -import java.util.ArrayList; -import java.util.List; -import org.apache.http.Consts; -import org.apache.http.HttpEntity; -import org.apache.http.NameValuePair; -import org.apache.http.StatusLine; -import org.apache.http.client.ClientProtocolException; -import org.apache.http.client.HttpResponseException; -import org.apache.http.client.entity.UrlEncodedFormEntity; -import org.apache.http.client.methods.CloseableHttpResponse; -import org.apache.http.client.methods.HttpPost; -import org.apache.http.entity.ContentType; -import org.apache.http.impl.client.CloseableHttpClient; -import org.apache.http.impl.client.HttpClientBuilder; -import org.apache.http.message.BasicNameValuePair; -import org.apache.http.util.EntityUtils; +import java.net.URLEncoder; +import java.util.Map; +import java.util.TreeMap; +import java.util.concurrent.ExecutionException; +import java.util.stream.Collectors; + +import org.apache.pulsar.PulsarVersion; +import org.apache.pulsar.common.util.ObjectMapperFactory; +import org.asynchttpclient.AsyncHttpClient; +import org.asynchttpclient.AsyncHttpClientConfig; +import org.asynchttpclient.DefaultAsyncHttpClient; +import org.asynchttpclient.DefaultAsyncHttpClientConfig; +import org.asynchttpclient.Response; /** * A client for an OAuth 2.0 token endpoint. */ -public class TokenClient implements AutoCloseable, ClientCredentialsExchanger { - - private static final ObjectReader resultReader; - private static final ObjectReader errorReader; +public class TokenClient implements ClientCredentialsExchanger { - static { - resultReader = new ObjectMapper().readerFor(TokenResult.class); - errorReader = new ObjectMapper().readerFor(TokenError.class); - } + protected final static int DEFAULT_CONNECT_TIMEOUT_IN_SECONDS = 10; + protected final static int DEFAULT_READ_TIMEOUT_IN_SECONDS = 30; private final URL tokenUrl; - private final CloseableHttpClient httpclient; + private final AsyncHttpClient httpClient; public TokenClient(URL tokenUrl) { this.tokenUrl = tokenUrl; - this.httpclient = HttpClientBuilder.create().useSystemProperties().disableCookieManagement().build(); + + DefaultAsyncHttpClientConfig.Builder confBuilder = new DefaultAsyncHttpClientConfig.Builder(); + confBuilder.setFollowRedirect(true); + confBuilder.setConnectTimeout(DEFAULT_CONNECT_TIMEOUT_IN_SECONDS * 1000); + confBuilder.setReadTimeout(DEFAULT_READ_TIMEOUT_IN_SECONDS * 1000); + confBuilder.setUserAgent(String.format("Pulsar-Java-v%s", PulsarVersion.getVersion())); + AsyncHttpClientConfig config = confBuilder.build(); + httpClient = new DefaultAsyncHttpClient(config); } - public void close() { + @Override + public void close() throws Exception { + httpClient.close(); } /** @@ -76,46 +71,50 @@ public class TokenClient implements AutoCloseable, ClientCredentialsExchanger { */ public TokenResult exchangeClientCredentials(ClientCredentialsExchangeRequest req) throws TokenExchangeException, IOException { - List<NameValuePair> params = new ArrayList<>(4); - params.add(new BasicNameValuePair("grant_type", "client_credentials")); - params.add(new BasicNameValuePair("client_id", req.getClientId())); - params.add(new BasicNameValuePair("client_secret", req.getClientSecret())); - params.add(new BasicNameValuePair("audience", req.getAudience())); - HttpPost post = new HttpPost(tokenUrl.toString()); - post.setHeader("Accept", ContentType.APPLICATION_JSON.getMimeType()); - post.setEntity(new UrlEncodedFormEntity(params, Consts.UTF_8)); - - try (CloseableHttpResponse response = httpclient.execute(post)) { - StatusLine status = response.getStatusLine(); - HttpEntity entity = response.getEntity(); - try { - switch(status.getStatusCode()) { - case HttpURLConnection.HTTP_OK: - return readResponse(entity, resultReader); - case HttpURLConnection.HTTP_BAD_REQUEST: - case HttpURLConnection.HTTP_UNAUTHORIZED: - throw new TokenExchangeException(readResponse(entity, errorReader)); - default: - throw new HttpResponseException(status.getStatusCode(), status.getReasonPhrase()); - } - } finally { - EntityUtils.consume(entity); + Map<String, String> bodyMap = new TreeMap<>(); + bodyMap.put("grant_type", "client_credentials"); + bodyMap.put("client_id", req.getClientId()); + bodyMap.put("client_secret", req.getClientSecret()); + bodyMap.put("audience", req.getAudience()); + String body = bodyMap.entrySet().stream() + .map(e -> { + try { + return URLEncoder.encode(e.getKey(), "UTF-8") + '=' + URLEncoder.encode(e.getValue(), "UTF-8"); + } catch (UnsupportedEncodingException e1) { + throw new RuntimeException(e1); + } + }) + .collect(Collectors.joining("&")); + + try { + + Response res = httpClient.preparePost(tokenUrl.toString()) + .setHeader("Accept", "application/json") + .setHeader("Content-Type", "application/x-www-form-urlencoded") + .setBody(body) + .execute() + .get(); + + switch (res.getStatusCode()) { + case 200: + return ObjectMapperFactory.getThreadLocal().reader().readValue(res.getResponseBodyAsBytes(), + TokenResult.class); + + case 400: // Bad request + case 401: // Unauthorized + throw new TokenExchangeException( + ObjectMapperFactory.getThreadLocal().reader().readValue(res.getResponseBodyAsBytes(), + TokenError.class)); + + default: + throw new IOException( + "Failed to perform HTTP request. res: " + res.getStatusCode() + " " + res.getStatusText()); } - } - } - private static <T> T readResponse(HttpEntity entity, ObjectReader objectReader) throws IOException { - ContentType contentType = ContentType.getOrDefault(entity); - if (!ContentType.APPLICATION_JSON.getMimeType().equals(contentType.getMimeType())) { - throw new ClientProtocolException("Unsupported content type: " + contentType.getMimeType()); - } - Charset charset = contentType.getCharset(); - if (charset == null) { - charset = StandardCharsets.UTF_8; - } - try (Reader reader = new InputStreamReader(entity.getContent(), charset)) { - @SuppressWarnings("unchecked") T obj = (T) objectReader.readValue(reader); - return obj; + + + } catch (InterruptedException | ExecutionException e1) { + throw new IOException(e1); } } }
