This is an automated email from the ASF dual-hosted git repository.

mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 396eceb  Make OAuth2 auth plugin to use AsyncHttpClient (#7615)
396eceb is described below

commit 396ecebe3e2bdb88b1599535a8d477f7a198f062
Author: Matteo Merli <[email protected]>
AuthorDate: Fri Jul 24 14:49:16 2020 -0700

    Make OAuth2 auth plugin to use AsyncHttpClient (#7615)
---
 .../impl/auth/oauth2/AuthenticationOAuth2.java     |   6 +-
 .../impl/auth/oauth2/ClientCredentialsFlow.java    |   5 +-
 .../pulsar/client/impl/auth/oauth2/Flow.java       |   4 +-
 .../protocol/ClientCredentialsExchanger.java       |   7 +-
 .../impl/auth/oauth2/protocol/TokenClient.java     | 143 ++++++++++-----------
 5 files changed, 81 insertions(+), 84 deletions(-)

diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/oauth2/AuthenticationOAuth2.java
 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/oauth2/AuthenticationOAuth2.java
index f7f41d0..be48efe 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/oauth2/AuthenticationOAuth2.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/oauth2/AuthenticationOAuth2.java
@@ -107,7 +107,11 @@ public class AuthenticationOAuth2 implements 
Authentication, EncodedAuthenticati
 
     @Override
     public void close() throws IOException {
-        flow.close();
+        try {
+            flow.close();
+        } catch (Exception e) {
+            throw new IOException(e);
+        }
     }
 
     @Data
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/oauth2/ClientCredentialsFlow.java
 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/oauth2/ClientCredentialsFlow.java
index 13bf0f5..a7b30ab 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/oauth2/ClientCredentialsFlow.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/oauth2/ClientCredentialsFlow.java
@@ -34,7 +34,6 @@ import java.nio.charset.StandardCharsets;
 import java.util.Map;
 import lombok.Builder;
 import lombok.extern.slf4j.Slf4j;
-import org.apache.http.entity.ContentType;
 import org.apache.pulsar.client.api.PulsarClientException;
 
 /**
@@ -98,7 +97,7 @@ class ClientCredentialsFlow extends FlowBase {
     }
 
     @Override
-    public void close() {
+    public void close() throws Exception {
         exchanger.close();
     }
 
@@ -130,7 +129,7 @@ class ClientCredentialsFlow extends FlowBase {
 
             String protocol = urlConnection.getURL().getProtocol();
             String contentType = urlConnection.getContentType();
-            if ("data".equals(protocol) && 
!ContentType.APPLICATION_JSON.getMimeType().equals(contentType)) {
+            if ("data".equals(protocol) && 
!"application/json".equals(contentType)) {
                 throw new IllegalArgumentException(
                         "Unsupported media type or encoding format: " + 
urlConnection.getContentType());
             }
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/oauth2/Flow.java
 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/oauth2/Flow.java
index b572325..0e7b864 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/oauth2/Flow.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/oauth2/Flow.java
@@ -25,7 +25,7 @@ import org.apache.pulsar.client.api.PulsarClientException;
 /**
  * An OAuth 2.0 authorization flow.
  */
-interface Flow extends Serializable {
+interface Flow extends Serializable, AutoCloseable {
 
     /**
      * Initializes the authorization flow.
@@ -43,5 +43,5 @@ interface Flow extends Serializable {
     /**
      * Closes the authorization flow.
      */
-    void close();
+    void close() throws Exception;
 }
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/oauth2/protocol/ClientCredentialsExchanger.java
 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/oauth2/protocol/ClientCredentialsExchanger.java
index e6a956a..7d004c7 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/oauth2/protocol/ClientCredentialsExchanger.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/oauth2/protocol/ClientCredentialsExchanger.java
@@ -23,7 +23,7 @@ import java.io.IOException;
 /**
  * An interface for exchanging client credentials for an access token.
  */
-public interface ClientCredentialsExchanger {
+public interface ClientCredentialsExchanger extends AutoCloseable {
     /**
      * Requests an exchange of client credentials for an access token.
      * @param req the request details.
@@ -33,9 +33,4 @@ public interface ClientCredentialsExchanger {
      */
     TokenResult exchangeClientCredentials(ClientCredentialsExchangeRequest req)
             throws TokenExchangeException, IOException;
-
-    /**
-     * Closes the exchanger.
-     */
-    void close();
 }
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/oauth2/protocol/TokenClient.java
 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/oauth2/protocol/TokenClient.java
index 715579d..0718073 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/oauth2/protocol/TokenClient.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/oauth2/protocol/TokenClient.java
@@ -18,54 +18,49 @@
  */
 package org.apache.pulsar.client.impl.auth.oauth2.protocol;
 
-import com.fasterxml.jackson.databind.ObjectMapper;
-import com.fasterxml.jackson.databind.ObjectReader;
 import java.io.IOException;
-import java.io.InputStreamReader;
-import java.io.Reader;
-import java.net.HttpURLConnection;
+import java.io.UnsupportedEncodingException;
 import java.net.URL;
-import java.nio.charset.Charset;
-import java.nio.charset.StandardCharsets;
-import java.util.ArrayList;
-import java.util.List;
-import org.apache.http.Consts;
-import org.apache.http.HttpEntity;
-import org.apache.http.NameValuePair;
-import org.apache.http.StatusLine;
-import org.apache.http.client.ClientProtocolException;
-import org.apache.http.client.HttpResponseException;
-import org.apache.http.client.entity.UrlEncodedFormEntity;
-import org.apache.http.client.methods.CloseableHttpResponse;
-import org.apache.http.client.methods.HttpPost;
-import org.apache.http.entity.ContentType;
-import org.apache.http.impl.client.CloseableHttpClient;
-import org.apache.http.impl.client.HttpClientBuilder;
-import org.apache.http.message.BasicNameValuePair;
-import org.apache.http.util.EntityUtils;
+import java.net.URLEncoder;
+import java.util.Map;
+import java.util.TreeMap;
+import java.util.concurrent.ExecutionException;
+import java.util.stream.Collectors;
+
+import org.apache.pulsar.PulsarVersion;
+import org.apache.pulsar.common.util.ObjectMapperFactory;
+import org.asynchttpclient.AsyncHttpClient;
+import org.asynchttpclient.AsyncHttpClientConfig;
+import org.asynchttpclient.DefaultAsyncHttpClient;
+import org.asynchttpclient.DefaultAsyncHttpClientConfig;
+import org.asynchttpclient.Response;
 
 /**
  * A client for an OAuth 2.0 token endpoint.
  */
-public class TokenClient implements AutoCloseable, ClientCredentialsExchanger {
-
-    private static final ObjectReader resultReader;
-    private static final ObjectReader errorReader;
+public class TokenClient implements ClientCredentialsExchanger {
 
-    static {
-        resultReader = new ObjectMapper().readerFor(TokenResult.class);
-        errorReader = new ObjectMapper().readerFor(TokenError.class);
-    }
+    protected final static int DEFAULT_CONNECT_TIMEOUT_IN_SECONDS = 10;
+    protected final static int DEFAULT_READ_TIMEOUT_IN_SECONDS = 30;
 
     private final URL tokenUrl;
-    private final CloseableHttpClient httpclient;
+    private final AsyncHttpClient httpClient;
 
     public TokenClient(URL tokenUrl) {
         this.tokenUrl = tokenUrl;
-        this.httpclient = 
HttpClientBuilder.create().useSystemProperties().disableCookieManagement().build();
+
+        DefaultAsyncHttpClientConfig.Builder confBuilder = new 
DefaultAsyncHttpClientConfig.Builder();
+        confBuilder.setFollowRedirect(true);
+        confBuilder.setConnectTimeout(DEFAULT_CONNECT_TIMEOUT_IN_SECONDS * 
1000);
+        confBuilder.setReadTimeout(DEFAULT_READ_TIMEOUT_IN_SECONDS * 1000);
+        confBuilder.setUserAgent(String.format("Pulsar-Java-v%s", 
PulsarVersion.getVersion()));
+        AsyncHttpClientConfig config = confBuilder.build();
+        httpClient = new DefaultAsyncHttpClient(config);
     }
 
-    public void close() {
+    @Override
+    public void close() throws Exception {
+        httpClient.close();
     }
 
     /**
@@ -76,46 +71,50 @@ public class TokenClient implements AutoCloseable, 
ClientCredentialsExchanger {
      */
     public TokenResult 
exchangeClientCredentials(ClientCredentialsExchangeRequest req)
             throws TokenExchangeException, IOException {
-        List<NameValuePair> params = new ArrayList<>(4);
-        params.add(new BasicNameValuePair("grant_type", "client_credentials"));
-        params.add(new BasicNameValuePair("client_id", req.getClientId()));
-        params.add(new BasicNameValuePair("client_secret", 
req.getClientSecret()));
-        params.add(new BasicNameValuePair("audience", req.getAudience()));
-        HttpPost post = new HttpPost(tokenUrl.toString());
-        post.setHeader("Accept", ContentType.APPLICATION_JSON.getMimeType());
-        post.setEntity(new UrlEncodedFormEntity(params, Consts.UTF_8));
-
-        try (CloseableHttpResponse response = httpclient.execute(post)) {
-            StatusLine status = response.getStatusLine();
-            HttpEntity entity = response.getEntity();
-            try {
-                switch(status.getStatusCode()) {
-                    case HttpURLConnection.HTTP_OK:
-                        return readResponse(entity, resultReader);
-                    case HttpURLConnection.HTTP_BAD_REQUEST:
-                    case HttpURLConnection.HTTP_UNAUTHORIZED:
-                        throw new TokenExchangeException(readResponse(entity, 
errorReader));
-                    default:
-                        throw new 
HttpResponseException(status.getStatusCode(), status.getReasonPhrase());
-                }
-            } finally {
-                EntityUtils.consume(entity);
+        Map<String, String> bodyMap = new TreeMap<>();
+        bodyMap.put("grant_type", "client_credentials");
+        bodyMap.put("client_id", req.getClientId());
+        bodyMap.put("client_secret", req.getClientSecret());
+        bodyMap.put("audience", req.getAudience());
+        String body = bodyMap.entrySet().stream()
+                .map(e -> {
+                    try {
+                        return URLEncoder.encode(e.getKey(), "UTF-8") + '=' + 
URLEncoder.encode(e.getValue(), "UTF-8");
+                    } catch (UnsupportedEncodingException e1) {
+                        throw new RuntimeException(e1);
+                    }
+                })
+                .collect(Collectors.joining("&"));
+
+        try {
+
+            Response res = httpClient.preparePost(tokenUrl.toString())
+                    .setHeader("Accept", "application/json")
+                    .setHeader("Content-Type", 
"application/x-www-form-urlencoded")
+                    .setBody(body)
+                    .execute()
+                    .get();
+
+            switch (res.getStatusCode()) {
+            case 200:
+                return 
ObjectMapperFactory.getThreadLocal().reader().readValue(res.getResponseBodyAsBytes(),
+                        TokenResult.class);
+
+            case 400: // Bad request
+            case 401: // Unauthorized
+                throw new TokenExchangeException(
+                        
ObjectMapperFactory.getThreadLocal().reader().readValue(res.getResponseBodyAsBytes(),
+                                TokenError.class));
+
+            default:
+                throw new IOException(
+                        "Failed to perform HTTP request. res: " + 
res.getStatusCode() + " " + res.getStatusText());
             }
-        }
-    }
 
-    private static <T> T readResponse(HttpEntity entity, ObjectReader 
objectReader) throws IOException {
-        ContentType contentType = ContentType.getOrDefault(entity);
-        if 
(!ContentType.APPLICATION_JSON.getMimeType().equals(contentType.getMimeType())) 
{
-            throw new ClientProtocolException("Unsupported content type: " + 
contentType.getMimeType());
-        }
-        Charset charset = contentType.getCharset();
-        if (charset == null) {
-            charset = StandardCharsets.UTF_8;
-        }
-        try (Reader reader = new InputStreamReader(entity.getContent(), 
charset)) {
-            @SuppressWarnings("unchecked") T obj = (T) 
objectReader.readValue(reader);
-            return obj;
+
+
+        } catch (InterruptedException | ExecutionException e1) {
+            throw new IOException(e1);
         }
     }
 }

Reply via email to