This is an automated email from the ASF dual-hosted git repository.
penghui pushed a commit to branch branch-2.7
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/branch-2.7 by this push:
new cd7c179 [pulsar-client]Add a optional params scope for pulsar oauth2
client (#11931)
cd7c179 is described below
commit cd7c179161b6b5d52e1d1d7b747f0a75cb24a77d
Author: Guangning E <[email protected]>
AuthorDate: Thu Sep 9 08:55:45 2021 +0800
[pulsar-client]Add a optional params scope for pulsar oauth2 client (#11931)
### Motivation
In some scenarios (e.g. azure cloud), when the client exchanges tokens with
the server, an optional scope parameter is required, this pr fixes this issue,
to ensure compatibility, when the user does not fill in this parameter, all
behavior is the same as before.
### Modifications
* Add an optional parameter scope when exchanges token
### Verifying this change
- [x] Make sure that the change passes the CI checks.
(cherry picked from commit ac5114f8944784972b831438f8c7e0cbd57db4e5)
---
.../auth/oauth2/AuthenticationFactoryOAuth2.java | 18 ++++
.../impl/auth/oauth2/ClientCredentialsFlow.java | 9 +-
.../protocol/ClientCredentialsExchangeRequest.java | 3 +
.../impl/auth/oauth2/protocol/TokenClient.java | 54 +++++++---
.../impl/auth/oauth2/protocol/TokenClientTest.java | 116 +++++++++++++++++++++
5 files changed, 182 insertions(+), 18 deletions(-)
diff --git
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/oauth2/AuthenticationFactoryOAuth2.java
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/oauth2/AuthenticationFactoryOAuth2.java
index 54da5287d..707fcaf 100644
---
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/oauth2/AuthenticationFactoryOAuth2.java
+++
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/oauth2/AuthenticationFactoryOAuth2.java
@@ -37,10 +37,28 @@ public final class AuthenticationFactoryOAuth2 {
* @return an Authentication object
*/
public static Authentication clientCredentials(URL issuerUrl, URL
credentialsUrl, String audience) {
+ return clientCredentials(issuerUrl, credentialsUrl, audience, null);
+ }
+
+ /**
+ * Authenticate with client credentials.
+ *
+ * @param issuerUrl the issuer URL
+ * @param credentialsUrl the credentials URL
+ * @param audience the audience identifier
+ * @param scope An optional field. The value of the scope parameter is
expressed as a list of space-delimited,
+ * case-sensitive strings. The strings are defined by the
authorization server.
+ * If the value contains multiple space-delimited strings,
their order does not matter,
+ * and each string adds an additional access range to the
requested scope.
+ * From here:
https://datatracker.ietf.org/doc/html/rfc6749#section-4.4.2
+ * @return an Authentication object
+ */
+ public static Authentication clientCredentials(URL issuerUrl, URL
credentialsUrl, String audience, String scope) {
ClientCredentialsFlow flow = ClientCredentialsFlow.builder()
.issuerUrl(issuerUrl)
.privateKey(credentialsUrl.toExternalForm())
.audience(audience)
+ .scope(scope)
.build();
return new AuthenticationOAuth2(flow, Clock.systemDefaultZone());
}
diff --git
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/oauth2/ClientCredentialsFlow.java
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/oauth2/ClientCredentialsFlow.java
index 8d82cc2..b011e85 100644
---
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/oauth2/ClientCredentialsFlow.java
+++
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/oauth2/ClientCredentialsFlow.java
@@ -46,21 +46,24 @@ class ClientCredentialsFlow extends FlowBase {
public static final String CONFIG_PARAM_ISSUER_URL = "issuerUrl";
public static final String CONFIG_PARAM_AUDIENCE = "audience";
public static final String CONFIG_PARAM_KEY_FILE = "privateKey";
+ public static final String CONFIG_PARAM_SCOPE = "scope";
private static final long serialVersionUID = 1L;
private final String audience;
private final String privateKey;
+ private final String scope;
private transient ClientCredentialsExchanger exchanger;
private boolean initialized = false;
@Builder
- public ClientCredentialsFlow(URL issuerUrl, String audience, String
privateKey) {
+ public ClientCredentialsFlow(URL issuerUrl, String audience, String
privateKey, String scope) {
super(issuerUrl);
this.audience = audience;
this.privateKey = privateKey;
+ this.scope = scope;
}
@Override
@@ -87,6 +90,7 @@ class ClientCredentialsFlow extends FlowBase {
.clientId(keyFile.getClientId())
.clientSecret(keyFile.getClientSecret())
.audience(this.audience)
+ .scope(this.scope)
.build();
TokenResult tr;
if (!initialized) {
@@ -116,10 +120,13 @@ class ClientCredentialsFlow extends FlowBase {
URL issuerUrl = parseParameterUrl(params, CONFIG_PARAM_ISSUER_URL);
String audience = parseParameterString(params, CONFIG_PARAM_AUDIENCE);
String privateKeyUrl = parseParameterString(params,
CONFIG_PARAM_KEY_FILE);
+ // This is an optional parameter
+ String scope = params.get(CONFIG_PARAM_SCOPE);
return ClientCredentialsFlow.builder()
.issuerUrl(issuerUrl)
.audience(audience)
.privateKey(privateKeyUrl)
+ .scope(scope)
.build();
}
diff --git
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/oauth2/protocol/ClientCredentialsExchangeRequest.java
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/oauth2/protocol/ClientCredentialsExchangeRequest.java
index 7c14296..2d37bb5 100644
---
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/oauth2/protocol/ClientCredentialsExchangeRequest.java
+++
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/oauth2/protocol/ClientCredentialsExchangeRequest.java
@@ -39,4 +39,7 @@ public class ClientCredentialsExchangeRequest {
@JsonProperty("audience")
private String audience;
+
+ @JsonProperty("scope")
+ private String scope;
}
diff --git
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/oauth2/protocol/TokenClient.java
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/oauth2/protocol/TokenClient.java
index 0718073..bc059fb 100644
---
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/oauth2/protocol/TokenClient.java
+++
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/oauth2/protocol/TokenClient.java
@@ -27,6 +27,7 @@ import java.util.TreeMap;
import java.util.concurrent.ExecutionException;
import java.util.stream.Collectors;
+import org.apache.commons.lang3.StringUtils;
import org.apache.pulsar.PulsarVersion;
import org.apache.pulsar.common.util.ObjectMapperFactory;
import org.asynchttpclient.AsyncHttpClient;
@@ -47,15 +48,22 @@ public class TokenClient implements
ClientCredentialsExchanger {
private final AsyncHttpClient httpClient;
public TokenClient(URL tokenUrl) {
- this.tokenUrl = tokenUrl;
+ this(tokenUrl, null);
+ }
- DefaultAsyncHttpClientConfig.Builder confBuilder = new
DefaultAsyncHttpClientConfig.Builder();
- confBuilder.setFollowRedirect(true);
- confBuilder.setConnectTimeout(DEFAULT_CONNECT_TIMEOUT_IN_SECONDS *
1000);
- confBuilder.setReadTimeout(DEFAULT_READ_TIMEOUT_IN_SECONDS * 1000);
- confBuilder.setUserAgent(String.format("Pulsar-Java-v%s",
PulsarVersion.getVersion()));
- AsyncHttpClientConfig config = confBuilder.build();
- httpClient = new DefaultAsyncHttpClient(config);
+ TokenClient(URL tokenUrl, AsyncHttpClient httpClient) {
+ if (httpClient == null) {
+ DefaultAsyncHttpClientConfig.Builder confBuilder = new
DefaultAsyncHttpClientConfig.Builder();
+ confBuilder.setFollowRedirect(true);
+ confBuilder.setConnectTimeout(DEFAULT_CONNECT_TIMEOUT_IN_SECONDS *
1000);
+ confBuilder.setReadTimeout(DEFAULT_READ_TIMEOUT_IN_SECONDS * 1000);
+ confBuilder.setUserAgent(String.format("Pulsar-Java-v%s",
PulsarVersion.getVersion()));
+ AsyncHttpClientConfig config = confBuilder.build();
+ this.httpClient = new DefaultAsyncHttpClient(config);
+ } else {
+ this.httpClient = httpClient;
+ }
+ this.tokenUrl = tokenUrl;
}
@Override
@@ -64,6 +72,23 @@ public class TokenClient implements
ClientCredentialsExchanger {
}
/**
+ * Constructing http request parameters.
+ * @param bodyMap List of parameters to be requested.
+ * @return Generate the final request body from a map.
+ */
+ String buildClientCredentialsBody(Map<String, String> bodyMap) {
+ return bodyMap.entrySet().stream()
+ .map(e -> {
+ try {
+ return URLEncoder.encode(e.getKey(), "UTF-8") + '=' +
URLEncoder.encode(e.getValue(), "UTF-8");
+ } catch (UnsupportedEncodingException e1) {
+ throw new RuntimeException(e1);
+ }
+ })
+ .collect(Collectors.joining("&"));
+ }
+
+ /**
* Performs a token exchange using client credentials.
* @param req the client credentials request details.
* @return a token result
@@ -76,15 +101,10 @@ public class TokenClient implements
ClientCredentialsExchanger {
bodyMap.put("client_id", req.getClientId());
bodyMap.put("client_secret", req.getClientSecret());
bodyMap.put("audience", req.getAudience());
- String body = bodyMap.entrySet().stream()
- .map(e -> {
- try {
- return URLEncoder.encode(e.getKey(), "UTF-8") + '=' +
URLEncoder.encode(e.getValue(), "UTF-8");
- } catch (UnsupportedEncodingException e1) {
- throw new RuntimeException(e1);
- }
- })
- .collect(Collectors.joining("&"));
+ if (!StringUtils.isBlank(req.getScope())) {
+ bodyMap.put("scope", req.getScope());
+ }
+ String body = buildClientCredentialsBody(bodyMap);
try {
diff --git
a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/auth/oauth2/protocol/TokenClientTest.java
b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/auth/oauth2/protocol/TokenClientTest.java
new file mode 100644
index 0000000..1617359
--- /dev/null
+++
b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/auth/oauth2/protocol/TokenClientTest.java
@@ -0,0 +1,116 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.impl.auth.oauth2.protocol;
+
+import com.google.gson.Gson;
+import org.asynchttpclient.BoundRequestBuilder;
+import org.asynchttpclient.DefaultAsyncHttpClient;
+import org.asynchttpclient.ListenableFuture;
+import org.asynchttpclient.Response;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.net.URL;
+import java.util.Map;
+import java.util.TreeMap;
+import java.util.concurrent.ExecutionException;
+
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+/**
+ * Token client exchange token mock test.
+ */
+public class TokenClientTest {
+
+ @Test
+ @SuppressWarnings("unchecked")
+ public void exchangeClientCredentialsSuccessByScopeTest() throws
+ IOException, TokenExchangeException, ExecutionException,
InterruptedException {
+ DefaultAsyncHttpClient defaultAsyncHttpClient =
mock(DefaultAsyncHttpClient.class);
+ URL url = new URL("http://localhost");
+ TokenClient tokenClient = new TokenClient(url, defaultAsyncHttpClient);
+ Map<String, String> bodyMap = new TreeMap<>();
+ ClientCredentialsExchangeRequest request =
ClientCredentialsExchangeRequest.builder()
+ .audience("test-audience")
+ .clientId("test-client-id")
+ .clientSecret("test-client-secret")
+ .scope("test-scope")
+ .build();
+ bodyMap.put("grant_type", "client_credentials");
+ bodyMap.put("client_id", request.getClientId());
+ bodyMap.put("client_secret", request.getClientSecret());
+ bodyMap.put("audience", request.getAudience());
+ bodyMap.put("scope", request.getScope());
+ String body = tokenClient.buildClientCredentialsBody(bodyMap);
+ BoundRequestBuilder boundRequestBuilder =
mock(BoundRequestBuilder.class);
+ Response response = mock(Response.class);
+ ListenableFuture<Response> listenableFuture =
mock(ListenableFuture.class);
+
when(defaultAsyncHttpClient.preparePost(url.toString())).thenReturn(boundRequestBuilder);
+ when(boundRequestBuilder.setHeader("Accept",
"application/json")).thenReturn(boundRequestBuilder);
+ when(boundRequestBuilder.setHeader("Content-Type",
"application/x-www-form-urlencoded")).thenReturn(boundRequestBuilder);
+
when(boundRequestBuilder.setBody(body)).thenReturn(boundRequestBuilder);
+ when(boundRequestBuilder.execute()).thenReturn(listenableFuture);
+ when(listenableFuture.get()).thenReturn(response);
+ when(response.getStatusCode()).thenReturn(200);
+ TokenResult tokenResult = new TokenResult();
+ tokenResult.setAccessToken("test-access-token");
+ tokenResult.setIdToken("test-id");
+ when(response.getResponseBodyAsBytes()).thenReturn(new
Gson().toJson(tokenResult).getBytes());
+ TokenResult tr = tokenClient.exchangeClientCredentials(request);
+ Assert.assertNotNull(tr);
+ }
+
+ @Test
+ @SuppressWarnings("unchecked")
+ public void exchangeClientCredentialsSuccessByNoScopeTest() throws
+ IOException, TokenExchangeException, ExecutionException,
InterruptedException {
+ DefaultAsyncHttpClient defaultAsyncHttpClient =
mock(DefaultAsyncHttpClient.class);
+ URL url = new URL("http://localhost");
+ TokenClient tokenClient = new TokenClient(url, defaultAsyncHttpClient);
+ Map<String, String> bodyMap = new TreeMap<>();
+ ClientCredentialsExchangeRequest request =
ClientCredentialsExchangeRequest.builder()
+ .audience("test-audience")
+ .clientId("test-client-id")
+ .clientSecret("test-client-secret")
+ .build();
+ bodyMap.put("grant_type", "client_credentials");
+ bodyMap.put("client_id", request.getClientId());
+ bodyMap.put("client_secret", request.getClientSecret());
+ bodyMap.put("audience", request.getAudience());
+ String body = tokenClient.buildClientCredentialsBody(bodyMap);
+ BoundRequestBuilder boundRequestBuilder =
mock(BoundRequestBuilder.class);
+ Response response = mock(Response.class);
+ ListenableFuture<Response> listenableFuture =
mock(ListenableFuture.class);
+
when(defaultAsyncHttpClient.preparePost(url.toString())).thenReturn(boundRequestBuilder);
+ when(boundRequestBuilder.setHeader("Accept",
"application/json")).thenReturn(boundRequestBuilder);
+ when(boundRequestBuilder.setHeader("Content-Type",
"application/x-www-form-urlencoded")).thenReturn(boundRequestBuilder);
+
when(boundRequestBuilder.setBody(body)).thenReturn(boundRequestBuilder);
+ when(boundRequestBuilder.execute()).thenReturn(listenableFuture);
+ when(listenableFuture.get()).thenReturn(response);
+ when(response.getStatusCode()).thenReturn(200);
+ TokenResult tokenResult = new TokenResult();
+ tokenResult.setAccessToken("test-access-token");
+ tokenResult.setIdToken("test-id");
+ when(response.getResponseBodyAsBytes()).thenReturn(new
Gson().toJson(tokenResult).getBytes());
+ TokenResult tr = tokenClient.exchangeClientCredentials(request);
+ Assert.assertNotNull(tr);
+ }
+}