This is an automated email from the ASF dual-hosted git repository. eolivelli pushed a commit to branch branch-2.9 in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit 0daae95d28c1c10d48ddfb1b8a7bacb9224df59d Author: Addison Higham <[email protected]> AuthorDate: Thu Oct 7 15:08:55 2021 -0600 Fix Pulsar Proxy to re-use authentication instance (#12245) * Fix Pulsar Proxy to re-use authentication instance Currently, the Pulsar Proxy creates a new PulsarClientImpl with a new instance of the client authentication plugin. For certain client auth implementations, this can cause issues. For example, if a client plugin needs to generate a token and then cache and re-use it (which is very common with typical Pulsar client usage) this pattern breaks, because the client auth plugin is tied to the lifecycle of the connection and not the more "singleton" usage of the Pulsar client. Arguably, we should instead figure out how to re-use the entire Pulsar client, but that likely has more complexity, instead this "quick fix" will get one of the most obvious cases solved. * add test for ensuring all same auth instance * Simplify ProxyAuthTest It isn't clear why this test was doing timeouts... it doesn't really seem to be testing anything as the auth token was not being refreshed and it appears the test was expected to pass (it almost looks like they were expected to fail?) This allows us to make this test faster and more reliable, as the timeouts don't really seem to be adding anything --- .../pulsar/proxy/server/ProxyConnection.java | 9 ++------ .../apache/pulsar/proxy/server/ProxyService.java | 14 ++++++++++++ .../proxy/server/ProxyAuthenticationTest.java | 25 +++++++++++++--------- 3 files changed, 31 insertions(+), 17 deletions(-) diff --git a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConnection.java b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConnection.java index dd814cf..f1b7807 100644 --- a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConnection.java +++ b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConnection.java @@ -71,7 +71,6 @@ public class ProxyConnection extends PulsarHandler implements FutureListener<Voi private PulsarClientImpl client; private ConnectionPool connectionPool; private ProxyService service; - private Authentication clientAuthentication; AuthenticationDataSource authenticationData; private State state; private final Supplier<SslHandler> sslHandlerSupplier; @@ -303,7 +302,6 @@ public class ProxyConnection extends PulsarHandler implements FutureListener<Voi try { // init authn this.clientConf = createClientConfiguration(); - this.clientAuthentication = clientConf.getAuthentication(); int protocolVersion = getProtocolVersionToAdvertise(connect); // authn not enabled, complete @@ -427,10 +425,7 @@ public class ProxyConnection extends PulsarHandler implements FutureListener<Voi ClientConfigurationData clientConf = new ClientConfigurationData(); clientConf.setServiceUrl(service.getServiceUrl()); ProxyConfiguration proxyConfig = service.getConfiguration(); - if (proxyConfig.getBrokerClientAuthenticationPlugin() != null) { - clientConf.setAuthentication(AuthenticationFactory.create(proxyConfig.getBrokerClientAuthenticationPlugin(), - proxyConfig.getBrokerClientAuthenticationParameters())); - } + clientConf.setAuthentication(this.getClientAuthentication()); if (proxyConfig.isTlsEnabledWithBroker()) { clientConf.setUseTls(true); if (proxyConfig.isBrokerClientTlsEnabledWithKeyStore()) { @@ -463,7 +458,7 @@ public class ProxyConnection extends PulsarHandler implements FutureListener<Voi } public Authentication getClientAuthentication() { - return clientAuthentication; + return service.getProxyClientAuthenticationPlugin(); } @Override diff --git a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyService.java b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyService.java index fe6362d..af5b2a8 100644 --- a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyService.java +++ b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyService.java @@ -53,6 +53,9 @@ import org.apache.pulsar.broker.authentication.AuthenticationService; import org.apache.pulsar.broker.authorization.AuthorizationService; import org.apache.pulsar.broker.resources.PulsarResources; import org.apache.pulsar.broker.web.plugin.servlet.AdditionalServlets; +import org.apache.pulsar.client.api.Authentication; +import org.apache.pulsar.client.api.AuthenticationFactory; +import org.apache.pulsar.client.impl.auth.AuthenticationDisabled; import org.apache.pulsar.common.allocator.PulsarByteBufAllocator; import org.apache.pulsar.common.configuration.PulsarConfigurationLoader; import org.apache.pulsar.common.util.netty.EventLoopUtil; @@ -69,6 +72,7 @@ import org.slf4j.LoggerFactory; public class ProxyService implements Closeable { private final ProxyConfiguration proxyConfig; + private final Authentication proxyClientAuthentication; private final Timer timer; private String serviceUrl; private String serviceUrlTls; @@ -163,6 +167,12 @@ public class ProxyService implements Closeable { }); }, 60, TimeUnit.SECONDS); this.proxyAdditionalServlets = AdditionalServlets.load(proxyConfig); + if (proxyConfig.getBrokerClientAuthenticationPlugin() != null) { + proxyClientAuthentication = AuthenticationFactory.create(proxyConfig.getBrokerClientAuthenticationPlugin(), + proxyConfig.getBrokerClientAuthenticationParameters()); + } else { + proxyClientAuthentication = AuthenticationDisabled.INSTANCE; + } } public void start() throws Exception { @@ -367,5 +377,9 @@ public class ProxyService implements Closeable { proxyConfig.getZookeeperSessionTimeoutMs()); } + public Authentication getProxyClientAuthenticationPlugin() { + return this.proxyClientAuthentication; + } + private static final Logger LOG = LoggerFactory.getLogger(ProxyService.class); } diff --git a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyAuthenticationTest.java b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyAuthenticationTest.java index 2e74995..f6d53c8 100644 --- a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyAuthenticationTest.java +++ b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyAuthenticationTest.java @@ -50,6 +50,7 @@ import org.apache.pulsar.common.configuration.PulsarConfigurationLoader; import org.apache.pulsar.common.policies.data.AuthAction; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.testng.Assert; import org.testng.annotations.AfterMethod; import org.testng.annotations.BeforeMethod; import org.testng.annotations.Test; @@ -144,9 +145,11 @@ public class ProxyAuthenticationTest extends ProducerConsumerBase { } JsonObject element = JsonParser.parseString(commandData).getAsJsonObject(); + log.info("Have log of {}", element); long expiryTimeInMillis = Long.parseLong(element.get("expiryTime").getAsString()); long currentTimeInMillis = System.currentTimeMillis(); if (expiryTimeInMillis < currentTimeInMillis) { + log.warn("Auth failed due to timeout"); throw new AuthenticationException("Authentication data has been expired"); } return element.get("entityType").getAsString(); @@ -198,10 +201,10 @@ public class ProxyAuthenticationTest extends ProducerConsumerBase { String namespaceName = "my-property/my-ns"; String topicName = "persistent://my-property/my-ns/my-topic1"; String subscriptionName = "my-subscriber-name"; - // expires after 6 seconds - String clientAuthParams = "entityType:client,expiryTime:" + (System.currentTimeMillis() + 6 * 1000); - // expires after 3 seconds - String proxyAuthParams = "entityType:proxy,expiryTime:" + (System.currentTimeMillis() + 3 * 1000); + // expires after 60 seconds + String clientAuthParams = "entityType:client,expiryTime:" + (System.currentTimeMillis() + 60 * 1000); + // expires after 60 seconds + String proxyAuthParams = "entityType:proxy,expiryTime:" + (System.currentTimeMillis() + 60 * 1000); admin.namespaces().grantPermissionOnNamespace(namespaceName, "proxy", Sets.newHashSet(AuthAction.consume, AuthAction.produce)); @@ -230,16 +233,18 @@ public class ProxyAuthenticationTest extends ProducerConsumerBase { proxyService.start(); final String proxyServiceUrl = proxyService.getServiceUrl(); - // Step 3: Pass correct client params + // Step 3: Pass correct client params and use multiple connections @Cleanup - PulsarClient proxyClient = createPulsarClient(proxyServiceUrl, clientAuthParams, 1); + PulsarClient proxyClient = createPulsarClient(proxyServiceUrl, clientAuthParams, 3); proxyClient.newProducer(Schema.BYTES).topic(topicName).create(); - // Sleep for 4 seconds - wait for proxy auth params to expire - Thread.sleep(4 * 1000); proxyClient.newProducer(Schema.BYTES).topic(topicName).create(); - // Sleep for 3 seconds - wait for client auth parans to expire - Thread.sleep(3 * 1000); proxyClient.newProducer(Schema.BYTES).topic(topicName).create(); + + // Step 4: Ensure that all client contexts share the same auth provider + Assert.assertTrue(proxyService.getClientCnxs().size() >= 3, "expect at least 3 clients"); + proxyService.getClientCnxs().stream().forEach((cnx) -> { + Assert.assertSame(cnx.authenticationProvider, proxyService.getAuthenticationService().getAuthenticationProvider("BasicAuthentication")); + }); } private void updateAdminClient() throws PulsarClientException {
