This is an automated email from the ASF dual-hosted git repository.
eolivelli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 6b93a16 Fix Pulsar Proxy to re-use authentication instance (#12245)
6b93a16 is described below
commit 6b93a1688b25b937379b763b4e2cb7c5423fb41a
Author: Addison Higham <[email protected]>
AuthorDate: Thu Oct 7 15:08:55 2021 -0600
Fix Pulsar Proxy to re-use authentication instance (#12245)
* Fix Pulsar Proxy to re-use authentication instance
Currently, the Pulsar Proxy creates a new PulsarClientImpl with a new
instance of the client authentication plugin.
For certain client auth implementations, this can cause issues. For
example, if a client plugin needs to generate a token and then cache and
re-use it (which is very common with typical Pulsar client usage) this
pattern breaks, because the client auth plugin is tied to the
lifecycle of the connection and not the more "singleton" usage of the
Pulsar client.
Arguably, we should instead figure out how to re-use the entire Pulsar
client, but that likely has more complexity, instead this "quick fix"
will get one of the most obvious cases solved.
* add test for ensuring all same auth instance
* Simplify ProxyAuthTest
It isn't clear why this test was doing timeouts... it doesn't really
seem to be testing anything as the auth token was not being refreshed
and it appears the test was expected to pass (it almost looks like they
were expected to fail?)
This allows us to make this test faster and more reliable, as the
timeouts don't really seem to be adding anything
---
.../pulsar/proxy/server/ProxyConnection.java | 9 ++------
.../apache/pulsar/proxy/server/ProxyService.java | 14 ++++++++++++
.../proxy/server/ProxyAuthenticationTest.java | 25 +++++++++++++---------
3 files changed, 31 insertions(+), 17 deletions(-)
diff --git
a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConnection.java
b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConnection.java
index dd814cf..f1b7807 100644
---
a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConnection.java
+++
b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConnection.java
@@ -71,7 +71,6 @@ public class ProxyConnection extends PulsarHandler implements
FutureListener<Voi
private PulsarClientImpl client;
private ConnectionPool connectionPool;
private ProxyService service;
- private Authentication clientAuthentication;
AuthenticationDataSource authenticationData;
private State state;
private final Supplier<SslHandler> sslHandlerSupplier;
@@ -303,7 +302,6 @@ public class ProxyConnection extends PulsarHandler
implements FutureListener<Voi
try {
// init authn
this.clientConf = createClientConfiguration();
- this.clientAuthentication = clientConf.getAuthentication();
int protocolVersion = getProtocolVersionToAdvertise(connect);
// authn not enabled, complete
@@ -427,10 +425,7 @@ public class ProxyConnection extends PulsarHandler
implements FutureListener<Voi
ClientConfigurationData clientConf = new ClientConfigurationData();
clientConf.setServiceUrl(service.getServiceUrl());
ProxyConfiguration proxyConfig = service.getConfiguration();
- if (proxyConfig.getBrokerClientAuthenticationPlugin() != null) {
-
clientConf.setAuthentication(AuthenticationFactory.create(proxyConfig.getBrokerClientAuthenticationPlugin(),
- proxyConfig.getBrokerClientAuthenticationParameters()));
- }
+ clientConf.setAuthentication(this.getClientAuthentication());
if (proxyConfig.isTlsEnabledWithBroker()) {
clientConf.setUseTls(true);
if (proxyConfig.isBrokerClientTlsEnabledWithKeyStore()) {
@@ -463,7 +458,7 @@ public class ProxyConnection extends PulsarHandler
implements FutureListener<Voi
}
public Authentication getClientAuthentication() {
- return clientAuthentication;
+ return service.getProxyClientAuthenticationPlugin();
}
@Override
diff --git
a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyService.java
b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyService.java
index fe6362d..af5b2a8 100644
---
a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyService.java
+++
b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyService.java
@@ -53,6 +53,9 @@ import
org.apache.pulsar.broker.authentication.AuthenticationService;
import org.apache.pulsar.broker.authorization.AuthorizationService;
import org.apache.pulsar.broker.resources.PulsarResources;
import org.apache.pulsar.broker.web.plugin.servlet.AdditionalServlets;
+import org.apache.pulsar.client.api.Authentication;
+import org.apache.pulsar.client.api.AuthenticationFactory;
+import org.apache.pulsar.client.impl.auth.AuthenticationDisabled;
import org.apache.pulsar.common.allocator.PulsarByteBufAllocator;
import org.apache.pulsar.common.configuration.PulsarConfigurationLoader;
import org.apache.pulsar.common.util.netty.EventLoopUtil;
@@ -69,6 +72,7 @@ import org.slf4j.LoggerFactory;
public class ProxyService implements Closeable {
private final ProxyConfiguration proxyConfig;
+ private final Authentication proxyClientAuthentication;
private final Timer timer;
private String serviceUrl;
private String serviceUrlTls;
@@ -163,6 +167,12 @@ public class ProxyService implements Closeable {
});
}, 60, TimeUnit.SECONDS);
this.proxyAdditionalServlets = AdditionalServlets.load(proxyConfig);
+ if (proxyConfig.getBrokerClientAuthenticationPlugin() != null) {
+ proxyClientAuthentication =
AuthenticationFactory.create(proxyConfig.getBrokerClientAuthenticationPlugin(),
+ proxyConfig.getBrokerClientAuthenticationParameters());
+ } else {
+ proxyClientAuthentication = AuthenticationDisabled.INSTANCE;
+ }
}
public void start() throws Exception {
@@ -367,5 +377,9 @@ public class ProxyService implements Closeable {
proxyConfig.getZookeeperSessionTimeoutMs());
}
+ public Authentication getProxyClientAuthenticationPlugin() {
+ return this.proxyClientAuthentication;
+ }
+
private static final Logger LOG =
LoggerFactory.getLogger(ProxyService.class);
}
diff --git
a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyAuthenticationTest.java
b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyAuthenticationTest.java
index 2e74995..f6d53c8 100644
---
a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyAuthenticationTest.java
+++
b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyAuthenticationTest.java
@@ -50,6 +50,7 @@ import
org.apache.pulsar.common.configuration.PulsarConfigurationLoader;
import org.apache.pulsar.common.policies.data.AuthAction;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import org.testng.Assert;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;
@@ -144,9 +145,11 @@ public class ProxyAuthenticationTest extends
ProducerConsumerBase {
}
JsonObject element =
JsonParser.parseString(commandData).getAsJsonObject();
+ log.info("Have log of {}", element);
long expiryTimeInMillis =
Long.parseLong(element.get("expiryTime").getAsString());
long currentTimeInMillis = System.currentTimeMillis();
if (expiryTimeInMillis < currentTimeInMillis) {
+ log.warn("Auth failed due to timeout");
throw new
AuthenticationException("Authentication data has been expired");
}
return element.get("entityType").getAsString();
@@ -198,10 +201,10 @@ public class ProxyAuthenticationTest extends
ProducerConsumerBase {
String namespaceName = "my-property/my-ns";
String topicName = "persistent://my-property/my-ns/my-topic1";
String subscriptionName = "my-subscriber-name";
- // expires after 6 seconds
- String clientAuthParams = "entityType:client,expiryTime:" +
(System.currentTimeMillis() + 6 * 1000);
- // expires after 3 seconds
- String proxyAuthParams = "entityType:proxy,expiryTime:" +
(System.currentTimeMillis() + 3 * 1000);
+ // expires after 60 seconds
+ String clientAuthParams = "entityType:client,expiryTime:" +
(System.currentTimeMillis() + 60 * 1000);
+ // expires after 60 seconds
+ String proxyAuthParams = "entityType:proxy,expiryTime:" +
(System.currentTimeMillis() + 60 * 1000);
admin.namespaces().grantPermissionOnNamespace(namespaceName,
"proxy",
Sets.newHashSet(AuthAction.consume,
AuthAction.produce));
@@ -230,16 +233,18 @@ public class ProxyAuthenticationTest extends
ProducerConsumerBase {
proxyService.start();
final String proxyServiceUrl = proxyService.getServiceUrl();
- // Step 3: Pass correct client params
+ // Step 3: Pass correct client params and use multiple
connections
@Cleanup
- PulsarClient proxyClient = createPulsarClient(proxyServiceUrl,
clientAuthParams, 1);
+ PulsarClient proxyClient = createPulsarClient(proxyServiceUrl,
clientAuthParams, 3);
proxyClient.newProducer(Schema.BYTES).topic(topicName).create();
- // Sleep for 4 seconds - wait for proxy auth params to expire
- Thread.sleep(4 * 1000);
proxyClient.newProducer(Schema.BYTES).topic(topicName).create();
- // Sleep for 3 seconds - wait for client auth parans to expire
- Thread.sleep(3 * 1000);
proxyClient.newProducer(Schema.BYTES).topic(topicName).create();
+
+ // Step 4: Ensure that all client contexts share the same auth
provider
+ Assert.assertTrue(proxyService.getClientCnxs().size() >= 3,
"expect at least 3 clients");
+ proxyService.getClientCnxs().stream().forEach((cnx) -> {
+ Assert.assertSame(cnx.authenticationProvider,
proxyService.getAuthenticationService().getAuthenticationProvider("BasicAuthentication"));
+ });
}
private void updateAdminClient() throws PulsarClientException {