This is an automated email from the ASF dual-hosted git repository. penghui pushed a commit to branch branch-2.8 in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit ef2753289ec2458fb157b51f2a2bcb2ea5645859 Author: Addison Higham <[email protected]> AuthorDate: Thu Oct 7 15:08:55 2021 -0600 Fix Pulsar Proxy to re-use authentication instance (#12245) * Fix Pulsar Proxy to re-use authentication instance Currently, the Pulsar Proxy creates a new PulsarClientImpl with a new instance of the client authentication plugin. For certain client auth implementations, this can cause issues. For example, if a client plugin needs to generate a token and then cache and re-use it (which is very common with typical Pulsar client usage) this pattern breaks, because the client auth plugin is tied to the lifecycle of the connection and not the more "singleton" usage of the Pulsar client. Arguably, we should instead figure out how to re-use the entire Pulsar client, but that likely has more complexity, instead this "quick fix" will get one of the most obvious cases solved. * add test for ensuring all same auth instance * Simplify ProxyAuthTest It isn't clear why this test was doing timeouts... it doesn't really seem to be testing anything as the auth token was not being refreshed and it appears the test was expected to pass (it almost looks like they were expected to fail?) This allows us to make this test faster and more reliable, as the timeouts don't really seem to be adding anything (cherry picked from commit 6b93a1688b25b937379b763b4e2cb7c5423fb41a) --- .../pulsar/proxy/server/ProxyConnection.java | 9 ++------ .../apache/pulsar/proxy/server/ProxyService.java | 16 +++++++++++++- .../proxy/server/ProxyAuthenticationTest.java | 25 +++++++++++++--------- 3 files changed, 32 insertions(+), 18 deletions(-) diff --git a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConnection.java b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConnection.java index dd814cf..f1b7807 100644 --- a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConnection.java +++ b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConnection.java @@ -71,7 +71,6 @@ public class ProxyConnection extends PulsarHandler implements FutureListener<Voi private PulsarClientImpl client; private ConnectionPool connectionPool; private ProxyService service; - private Authentication clientAuthentication; AuthenticationDataSource authenticationData; private State state; private final Supplier<SslHandler> sslHandlerSupplier; @@ -303,7 +302,6 @@ public class ProxyConnection extends PulsarHandler implements FutureListener<Voi try { // init authn this.clientConf = createClientConfiguration(); - this.clientAuthentication = clientConf.getAuthentication(); int protocolVersion = getProtocolVersionToAdvertise(connect); // authn not enabled, complete @@ -427,10 +425,7 @@ public class ProxyConnection extends PulsarHandler implements FutureListener<Voi ClientConfigurationData clientConf = new ClientConfigurationData(); clientConf.setServiceUrl(service.getServiceUrl()); ProxyConfiguration proxyConfig = service.getConfiguration(); - if (proxyConfig.getBrokerClientAuthenticationPlugin() != null) { - clientConf.setAuthentication(AuthenticationFactory.create(proxyConfig.getBrokerClientAuthenticationPlugin(), - proxyConfig.getBrokerClientAuthenticationParameters())); - } + clientConf.setAuthentication(this.getClientAuthentication()); if (proxyConfig.isTlsEnabledWithBroker()) { clientConf.setUseTls(true); if (proxyConfig.isBrokerClientTlsEnabledWithKeyStore()) { @@ -463,7 +458,7 @@ public class ProxyConnection extends PulsarHandler implements FutureListener<Voi } public Authentication getClientAuthentication() { - return clientAuthentication; + return service.getProxyClientAuthenticationPlugin(); } @Override diff --git a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyService.java b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyService.java index 53f5c9d..abc7a5f 100644 --- a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyService.java +++ b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyService.java @@ -52,12 +52,15 @@ import org.apache.pulsar.broker.authorization.AuthorizationService; import org.apache.pulsar.broker.cache.ConfigurationCacheService; import org.apache.pulsar.broker.cache.ConfigurationMetadataCacheService; import org.apache.pulsar.broker.resources.PulsarResources; +import org.apache.pulsar.broker.web.plugin.servlet.AdditionalServlets; +import org.apache.pulsar.client.api.Authentication; +import org.apache.pulsar.client.api.AuthenticationFactory; +import org.apache.pulsar.client.impl.auth.AuthenticationDisabled; import org.apache.pulsar.common.allocator.PulsarByteBufAllocator; import org.apache.pulsar.common.configuration.PulsarConfigurationLoader; import org.apache.pulsar.common.util.netty.EventLoopUtil; import org.apache.pulsar.metadata.api.MetadataStoreException; import org.apache.pulsar.metadata.api.extended.MetadataStoreExtended; -import org.apache.pulsar.broker.web.plugin.servlet.AdditionalServlets; import org.apache.pulsar.proxy.stats.TopicStats; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -71,6 +74,7 @@ import com.google.common.collect.Sets; public class ProxyService implements Closeable { private final ProxyConfiguration proxyConfig; + private final Authentication proxyClientAuthentication; private final Timer timer; private String serviceUrl; private String serviceUrlTls; @@ -161,6 +165,12 @@ public class ProxyService implements Closeable { }); }, 60, TimeUnit.SECONDS); this.proxyAdditionalServlets = AdditionalServlets.load(proxyConfig); + if (proxyConfig.getBrokerClientAuthenticationPlugin() != null) { + proxyClientAuthentication = AuthenticationFactory.create(proxyConfig.getBrokerClientAuthenticationPlugin(), + proxyConfig.getBrokerClientAuthenticationParameters()); + } else { + proxyClientAuthentication = AuthenticationDisabled.INSTANCE; + } } public void start() throws Exception { @@ -336,5 +346,9 @@ public class ProxyService implements Closeable { proxyConfig.getZookeeperSessionTimeoutMs()); } + public Authentication getProxyClientAuthenticationPlugin() { + return this.proxyClientAuthentication; + } + private static final Logger LOG = LoggerFactory.getLogger(ProxyService.class); } diff --git a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyAuthenticationTest.java b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyAuthenticationTest.java index 550aad4..57aa781 100644 --- a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyAuthenticationTest.java +++ b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyAuthenticationTest.java @@ -50,6 +50,7 @@ import org.apache.pulsar.common.configuration.PulsarConfigurationLoader; import org.apache.pulsar.common.policies.data.AuthAction; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.testng.Assert; import org.testng.annotations.AfterMethod; import org.testng.annotations.BeforeMethod; import org.testng.annotations.Test; @@ -145,9 +146,11 @@ public class ProxyAuthenticationTest extends ProducerConsumerBase { JsonParser parser = new JsonParser(); JsonObject element = parser.parse(commandData).getAsJsonObject(); + log.info("Have log of {}", element); long expiryTimeInMillis = Long.parseLong(element.get("expiryTime").getAsString()); long currentTimeInMillis = System.currentTimeMillis(); if (expiryTimeInMillis < currentTimeInMillis) { + log.warn("Auth failed due to timeout"); throw new AuthenticationException("Authentication data has been expired"); } return element.get("entityType").getAsString(); @@ -199,10 +202,10 @@ public class ProxyAuthenticationTest extends ProducerConsumerBase { String namespaceName = "my-property/my-ns"; String topicName = "persistent://my-property/my-ns/my-topic1"; String subscriptionName = "my-subscriber-name"; - // expires after 6 seconds - String clientAuthParams = "entityType:client,expiryTime:" + (System.currentTimeMillis() + 6 * 1000); - // expires after 3 seconds - String proxyAuthParams = "entityType:proxy,expiryTime:" + (System.currentTimeMillis() + 3 * 1000); + // expires after 60 seconds + String clientAuthParams = "entityType:client,expiryTime:" + (System.currentTimeMillis() + 60 * 1000); + // expires after 60 seconds + String proxyAuthParams = "entityType:proxy,expiryTime:" + (System.currentTimeMillis() + 60 * 1000); admin.namespaces().grantPermissionOnNamespace(namespaceName, "proxy", Sets.newHashSet(AuthAction.consume, AuthAction.produce)); @@ -231,16 +234,18 @@ public class ProxyAuthenticationTest extends ProducerConsumerBase { proxyService.start(); final String proxyServiceUrl = proxyService.getServiceUrl(); - // Step 3: Pass correct client params + // Step 3: Pass correct client params and use multiple connections @Cleanup - PulsarClient proxyClient = createPulsarClient(proxyServiceUrl, clientAuthParams, 1); + PulsarClient proxyClient = createPulsarClient(proxyServiceUrl, clientAuthParams, 3); proxyClient.newProducer(Schema.BYTES).topic(topicName).create(); - // Sleep for 4 seconds - wait for proxy auth params to expire - Thread.sleep(4 * 1000); proxyClient.newProducer(Schema.BYTES).topic(topicName).create(); - // Sleep for 3 seconds - wait for client auth parans to expire - Thread.sleep(3 * 1000); proxyClient.newProducer(Schema.BYTES).topic(topicName).create(); + + // Step 4: Ensure that all client contexts share the same auth provider + Assert.assertTrue(proxyService.getClientCnxs().size() >= 3, "expect at least 3 clients"); + proxyService.getClientCnxs().stream().forEach((cnx) -> { + Assert.assertSame(cnx.authenticationProvider, proxyService.getAuthenticationService().getAuthenticationProvider("BasicAuthentication")); + }); } private void updateAdminClient() throws PulsarClientException {
