This is an automated email from the ASF dual-hosted git repository.
technoboy pushed a commit to branch branch-2.11
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/branch-2.11 by this push:
new 7d4b2a54837 [fix][test] Replace test call to Auth0 with call to
WireMock (#20465)
7d4b2a54837 is described below
commit 7d4b2a548372d29696aa252c6e1871515032d1f8
Author: Jiwe Guo <[email protected]>
AuthorDate: Mon Jun 26 16:42:21 2023 +0800
[fix][test] Replace test call to Auth0 with call to WireMock (#20465)
---
pulsar-broker/pom.xml | 7 ++
...kenOauth2AuthenticatedProducerConsumerTest.java | 127 +++++++++++++++++++--
2 files changed, 124 insertions(+), 10 deletions(-)
diff --git a/pulsar-broker/pom.xml b/pulsar-broker/pom.xml
index c38be002187..4813ae51cbb 100644
--- a/pulsar-broker/pom.xml
+++ b/pulsar-broker/pom.xml
@@ -151,6 +151,13 @@
<scope>test</scope>
</dependency>
+ <dependency>
+ <groupId>com.github.tomakehurst</groupId>
+ <artifactId>wiremock-jre8</artifactId>
+ <version>${wiremock.version}</version>
+ <scope>test</scope>
+ </dependency>
+
<!-- zookeeper server -->
<dependency>
<groupId>io.dropwizard.metrics</groupId>
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/TokenOauth2AuthenticatedProducerConsumerTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/TokenOauth2AuthenticatedProducerConsumerTest.java
index 5d2e473d582..23b5f2076cb 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/TokenOauth2AuthenticatedProducerConsumerTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/TokenOauth2AuthenticatedProducerConsumerTest.java
@@ -18,15 +18,34 @@
*/
package org.apache.pulsar.client.api;
+import static com.github.tomakehurst.wiremock.client.WireMock.aResponse;
+import static com.github.tomakehurst.wiremock.client.WireMock.equalTo;
+import static com.github.tomakehurst.wiremock.client.WireMock.get;
+import static com.github.tomakehurst.wiremock.client.WireMock.post;
+import static com.github.tomakehurst.wiremock.client.WireMock.urlEqualTo;
+import static
com.github.tomakehurst.wiremock.core.WireMockConfiguration.wireMockConfig;
import static org.mockito.Mockito.spy;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertNotEquals;
+import com.github.tomakehurst.wiremock.WireMockServer;
+import com.github.tomakehurst.wiremock.common.FileSource;
+import com.github.tomakehurst.wiremock.extension.Parameters;
+import com.github.tomakehurst.wiremock.extension.ResponseTransformer;
+import com.github.tomakehurst.wiremock.http.Request;
+import com.github.tomakehurst.wiremock.http.Response;
import com.google.common.collect.Sets;
+import io.jsonwebtoken.SignatureAlgorithm;
+import io.jsonwebtoken.impl.DefaultJwtBuilder;
+import io.jsonwebtoken.security.Keys;
import java.net.URI;
import java.net.URL;
import java.nio.file.Path;
import java.nio.file.Paths;
+import java.security.KeyPair;
+import java.security.PrivateKey;
import java.time.Duration;
+import java.util.Base64;
+import java.util.Date;
import java.util.HashSet;
import java.util.Properties;
import java.util.Set;
@@ -35,6 +54,7 @@ import
org.apache.pulsar.broker.authentication.AuthenticationProviderToken;
import org.apache.pulsar.client.admin.PulsarAdmin;
import org.apache.pulsar.client.impl.ProducerImpl;
import org.apache.pulsar.client.impl.auth.oauth2.AuthenticationFactoryOAuth2;
+import org.apache.pulsar.client.impl.auth.oauth2.AuthenticationOAuth2;
import org.apache.pulsar.common.policies.data.ClusterData;
import org.apache.pulsar.common.policies.data.TenantInfoImpl;
import org.awaitility.Awaitility;
@@ -54,20 +74,51 @@ import org.testng.annotations.Test;
public class TokenOauth2AuthenticatedProducerConsumerTest extends
ProducerConsumerBase {
private static final Logger log =
LoggerFactory.getLogger(TokenOauth2AuthenticatedProducerConsumerTest.class);
- // public key in oauth2 server to verify the client passed in token. get
from https://jwt.io/
- private final String TOKEN_TEST_PUBLIC_KEY =
"data:;base64,MIIBIjANBgkqhkiG9w0BAQEFAAOCAQ8AMIIBCgKCAQEA2tZd/4gJda3U2Pc3tpgRAN7JPGWx/Gn17v/0IiZlNNRbP/Mmf0Vc6G1qsnaRaWNWOR+t6/a6ekFHJMikQ1N2X6yfz4UjMc8/G2FDPRmWjA+GURzARjVhxc/BBEYGoD0Kwvbq/u9CZm2QjlKrYaLfg3AeB09j0btNrDJ8rBsNzU6AuzChRvXj9IdcE/A/4N/UQ+S9cJ4UXP6NJbToLwajQ5km+CnxdGE6nfB7LWHvOFHjn9C2Rb9e37CFlmeKmIVFkagFM0gbmGOb6bnGI8Bp/VNGV0APef4YaBvBTqwoZ1Z4aDHy5eRxXfAMdtBkBupmBXqL6bpd15XRYUbu/7ck9QIDAQAB";
+ private WireMockServer server;
private final String ADMIN_ROLE =
"Xd23RHsUnvUlP7wchjNYOaIfazgeHd9x@clients";
// Credentials File, which contains "client_id" and "client_secret"
private final String CREDENTIALS_FILE =
"./src/test/resources/authentication/token/credentials_file.json";
+ private final String AUDIENCE =
"https://dev-kt-aa9ne.us.auth0.com/api/v2/";
@BeforeMethod(alwaysRun = true)
@Override
protected void setup() throws Exception {
+ // Create the token key pair
+ KeyPair keyPair = Keys.keyPairFor(SignatureAlgorithm.RS256);
+
+ // Start mocked OAuth2 server
+ server = new WireMockServer(wireMockConfig().port(0).extensions(new
OAuth2Transformer(keyPair, 3000)));
+ server.start();
+
+ // Set up a correct openid-configuration that points to the next stub
+ server.stubFor(
+ get(urlEqualTo("/.well-known/openid-configuration"))
+ .willReturn(aResponse()
+ .withHeader("Content-Type", "application/json")
+ .withBody("""
+ {
+ "issuer": "%s",
+ "token_endpoint": "%s/oauth/token"
+ }
+ """.replace("%s", server.baseUrl()))));
+
+ // Only respond when the client sends the expected request body
+ server.stubFor(
+ post(urlEqualTo("/oauth/token"))
+ .withRequestBody(
+
equalTo("audience=https%3A%2F%2Fdev-kt-aa9ne.us.auth0.com%2Fapi%2Fv2%2F&"
+ +
"client_id=Xd23RHsUnvUlP7wchjNYOaIfazgeHd9x&"
+ +
"client_secret=rT7ps7WY8uhdVuBTKWZkttwLdQotmdEliaM5rLfmgNibvqziZ-g07ZH52N"
+ +
"_poGAb&grant_type=client_credentials"))
+ .willReturn(aResponse()
+ .withTransformers("o-auth-token-transformer")
+ .withStatus(200)));
+
conf.setAuthenticationEnabled(true);
conf.setAuthorizationEnabled(true);
- conf.setAuthenticationRefreshCheckSeconds(5);
+ conf.setAuthenticationRefreshCheckSeconds(1);
Set<String> superUserRoles = new HashSet<>();
superUserRoles.add(ADMIN_ROLE);
@@ -77,11 +128,19 @@ public class TokenOauth2AuthenticatedProducerConsumerTest
extends ProducerConsum
providers.add(AuthenticationProviderToken.class.getName());
conf.setAuthenticationProviders(providers);
+
conf.setBrokerClientAuthenticationPlugin(AuthenticationOAuth2.class.getName());
+ conf.setBrokerClientAuthenticationParameters("{\n"
+ + " \"privateKey\": \"" + CREDENTIALS_FILE + "\",\n"
+ + " \"issuerUrl\": \"" + server.baseUrl() + "\",\n"
+ + " \"audience\": \"" + AUDIENCE + "\",\n"
+ + "}\n");
+
conf.setClusterName("test");
// Set provider domain name
Properties properties = new Properties();
- properties.setProperty("tokenPublicKey", TOKEN_TEST_PUBLIC_KEY);
+ properties.setProperty("tokenPublicKey", "data:;base64,"
+ +
Base64.getEncoder().encodeToString(keyPair.getPublic().getEncoded()));
conf.setProperties(properties);
super.init();
@@ -94,9 +153,9 @@ public class TokenOauth2AuthenticatedProducerConsumerTest
extends ProducerConsum
// AuthenticationOAuth2
Authentication authentication =
AuthenticationFactoryOAuth2.clientCredentials(
- new URL("https://dev-kt-aa9ne.us.auth0.com"),
+ new URL(server.baseUrl()),
path.toUri().toURL(), // key file path
- "https://dev-kt-aa9ne.us.auth0.com/api/v2/"
+ AUDIENCE
);
admin = spy(PulsarAdmin.builder().serviceHttpUrl(brokerUrl.toString())
@@ -112,6 +171,7 @@ public class TokenOauth2AuthenticatedProducerConsumerTest
extends ProducerConsum
@Override
protected void cleanup() throws Exception {
super.internalCleanup();
+ server.stop();
}
@DataProvider(name = "batch")
@@ -200,12 +260,11 @@ public class TokenOauth2AuthenticatedProducerConsumerTest
extends ProducerConsum
String accessTokenOld =
producerImpl.getClientCnx().getAuthenticationDataProvider().getCommandData();
long lastDisconnectTime = producer.getLastDisconnectedTimestamp();
- // the token expire duration is 10 seconds, so we need to wait for the
authenticationData refreshed
+ // the token expire duration is 3 seconds, so we need to wait for the
authenticationData refreshed
Awaitility.await()
- .atLeast(10, TimeUnit.SECONDS)
- .atMost(20, TimeUnit.SECONDS)
+ .atMost(10, TimeUnit.SECONDS)
.with()
- .pollInterval(Duration.ofSeconds(1))
+ .pollInterval(Duration.ofMillis(250))
.untilAsserted(() -> {
String accessTokenNew =
producerImpl.getClientCnx().getAuthenticationDataProvider().getCommandData();
assertNotEquals(accessTokenNew, accessTokenOld);
@@ -233,4 +292,52 @@ public class TokenOauth2AuthenticatedProducerConsumerTest
extends ProducerConsum
consumer.acknowledgeCumulative(msg);
consumer.close();
}
+
+ class OAuth2Transformer extends ResponseTransformer {
+
+ private final PrivateKey privateKey;
+ private final long tokenTTL;
+
+ OAuth2Transformer(KeyPair key, long tokenTTLMillis) {
+ this.privateKey = key.getPrivate();
+ this.tokenTTL = tokenTTLMillis;
+ }
+
+ @Override
+ public Response transform(Request request, Response response,
FileSource files, Parameters parameters) {
+ return Response.Builder.like(response).but().body("""
+ {
+
"access_token": "%s",
+ "expires_in":
%d,
+
"token_type":"Bearer"
+ }
+
""".formatted(generateToken(),
+ TimeUnit.MILLISECONDS.toSeconds(tokenTTL))).build();
+ }
+
+ @Override
+ public String getName() {
+ return "o-auth-token-transformer";
+ }
+
+ @Override
+ public boolean applyGlobally() {
+ return false;
+ }
+
+ private String generateToken() {
+ long now = System.currentTimeMillis();
+ DefaultJwtBuilder defaultJwtBuilder = new DefaultJwtBuilder();
+ defaultJwtBuilder.setHeaderParam("typ", "JWT");
+ defaultJwtBuilder.setHeaderParam("alg", "RS256");
+ defaultJwtBuilder.setIssuer(server.baseUrl());
+ defaultJwtBuilder.setSubject(ADMIN_ROLE);
+ defaultJwtBuilder.setAudience(AUDIENCE);
+ defaultJwtBuilder.setIssuedAt(new Date(now));
+ defaultJwtBuilder.setNotBefore(new Date(now));
+ defaultJwtBuilder.setExpiration(new Date(now + tokenTTL));
+ defaultJwtBuilder.signWith(privateKey);
+ return defaultJwtBuilder.compact();
+ }
+ }
}