This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch branch-2.8
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit ef2753289ec2458fb157b51f2a2bcb2ea5645859
Author: Addison Higham <[email protected]>
AuthorDate: Thu Oct 7 15:08:55 2021 -0600

    Fix Pulsar Proxy to re-use authentication instance (#12245)
    
    * Fix Pulsar Proxy to re-use authentication instance
    
    Currently, the Pulsar Proxy creates a new PulsarClientImpl with a new
    instance of the client authentication plugin.
    
    For certain client auth implementations, this can cause issues. For
    example, if a client plugin needs to generate a token and then cache and
    re-use it (which is very common with typical Pulsar client usage) this
    pattern breaks, because the client auth plugin is tied to the
    lifecycle of the connection and not the more "singleton" usage of the
    Pulsar client.
    
    Arguably, we should instead figure out how to re-use the entire Pulsar
    client, but that likely has more complexity, instead this "quick fix"
    will get one of the most obvious cases solved.
    
    * add test for ensuring all same auth instance
    
    * Simplify ProxyAuthTest
    
    It isn't clear why this test was doing timeouts... it doesn't really
    seem to be testing anything as the auth token was not being refreshed
    and it appears the test was expected to pass (it almost looks like they
    were expected to fail?)
    
    This allows us to make this test faster and more reliable, as the
    timeouts don't really seem to be adding anything
    
    (cherry picked from commit 6b93a1688b25b937379b763b4e2cb7c5423fb41a)
---
 .../pulsar/proxy/server/ProxyConnection.java       |  9 ++------
 .../apache/pulsar/proxy/server/ProxyService.java   | 16 +++++++++++++-
 .../proxy/server/ProxyAuthenticationTest.java      | 25 +++++++++++++---------
 3 files changed, 32 insertions(+), 18 deletions(-)

diff --git 
a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConnection.java
 
b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConnection.java
index dd814cf..f1b7807 100644
--- 
a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConnection.java
+++ 
b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConnection.java
@@ -71,7 +71,6 @@ public class ProxyConnection extends PulsarHandler implements 
FutureListener<Voi
     private PulsarClientImpl client;
     private ConnectionPool connectionPool;
     private ProxyService service;
-    private Authentication clientAuthentication;
     AuthenticationDataSource authenticationData;
     private State state;
     private final Supplier<SslHandler> sslHandlerSupplier;
@@ -303,7 +302,6 @@ public class ProxyConnection extends PulsarHandler 
implements FutureListener<Voi
         try {
             // init authn
             this.clientConf = createClientConfiguration();
-            this.clientAuthentication = clientConf.getAuthentication();
             int protocolVersion = getProtocolVersionToAdvertise(connect);
 
             // authn not enabled, complete
@@ -427,10 +425,7 @@ public class ProxyConnection extends PulsarHandler 
implements FutureListener<Voi
         ClientConfigurationData clientConf = new ClientConfigurationData();
         clientConf.setServiceUrl(service.getServiceUrl());
         ProxyConfiguration proxyConfig = service.getConfiguration();
-        if (proxyConfig.getBrokerClientAuthenticationPlugin() != null) {
-            
clientConf.setAuthentication(AuthenticationFactory.create(proxyConfig.getBrokerClientAuthenticationPlugin(),
-                    proxyConfig.getBrokerClientAuthenticationParameters()));
-        }
+        clientConf.setAuthentication(this.getClientAuthentication());
         if (proxyConfig.isTlsEnabledWithBroker()) {
             clientConf.setUseTls(true);
             if (proxyConfig.isBrokerClientTlsEnabledWithKeyStore()) {
@@ -463,7 +458,7 @@ public class ProxyConnection extends PulsarHandler 
implements FutureListener<Voi
     }
 
     public Authentication getClientAuthentication() {
-        return clientAuthentication;
+        return service.getProxyClientAuthenticationPlugin();
     }
 
     @Override
diff --git 
a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyService.java 
b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyService.java
index 53f5c9d..abc7a5f 100644
--- 
a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyService.java
+++ 
b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyService.java
@@ -52,12 +52,15 @@ import 
org.apache.pulsar.broker.authorization.AuthorizationService;
 import org.apache.pulsar.broker.cache.ConfigurationCacheService;
 import org.apache.pulsar.broker.cache.ConfigurationMetadataCacheService;
 import org.apache.pulsar.broker.resources.PulsarResources;
+import org.apache.pulsar.broker.web.plugin.servlet.AdditionalServlets;
+import org.apache.pulsar.client.api.Authentication;
+import org.apache.pulsar.client.api.AuthenticationFactory;
+import org.apache.pulsar.client.impl.auth.AuthenticationDisabled;
 import org.apache.pulsar.common.allocator.PulsarByteBufAllocator;
 import org.apache.pulsar.common.configuration.PulsarConfigurationLoader;
 import org.apache.pulsar.common.util.netty.EventLoopUtil;
 import org.apache.pulsar.metadata.api.MetadataStoreException;
 import org.apache.pulsar.metadata.api.extended.MetadataStoreExtended;
-import org.apache.pulsar.broker.web.plugin.servlet.AdditionalServlets;
 import org.apache.pulsar.proxy.stats.TopicStats;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -71,6 +74,7 @@ import com.google.common.collect.Sets;
 public class ProxyService implements Closeable {
 
     private final ProxyConfiguration proxyConfig;
+    private final Authentication proxyClientAuthentication;
     private final Timer timer;
     private String serviceUrl;
     private String serviceUrlTls;
@@ -161,6 +165,12 @@ public class ProxyService implements Closeable {
             });
         }, 60, TimeUnit.SECONDS);
         this.proxyAdditionalServlets = AdditionalServlets.load(proxyConfig);
+        if (proxyConfig.getBrokerClientAuthenticationPlugin() != null) {
+            proxyClientAuthentication = 
AuthenticationFactory.create(proxyConfig.getBrokerClientAuthenticationPlugin(),
+                    proxyConfig.getBrokerClientAuthenticationParameters());
+        } else {
+            proxyClientAuthentication = AuthenticationDisabled.INSTANCE;
+        }
     }
 
     public void start() throws Exception {
@@ -336,5 +346,9 @@ public class ProxyService implements Closeable {
                 proxyConfig.getZookeeperSessionTimeoutMs());
     }
 
+    public Authentication getProxyClientAuthenticationPlugin() {
+        return this.proxyClientAuthentication;
+    }
+
     private static final Logger LOG = 
LoggerFactory.getLogger(ProxyService.class);
 }
diff --git 
a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyAuthenticationTest.java
 
b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyAuthenticationTest.java
index 550aad4..57aa781 100644
--- 
a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyAuthenticationTest.java
+++ 
b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyAuthenticationTest.java
@@ -50,6 +50,7 @@ import 
org.apache.pulsar.common.configuration.PulsarConfigurationLoader;
 import org.apache.pulsar.common.policies.data.AuthAction;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
+import org.testng.Assert;
 import org.testng.annotations.AfterMethod;
 import org.testng.annotations.BeforeMethod;
 import org.testng.annotations.Test;
@@ -145,9 +146,11 @@ public class ProxyAuthenticationTest extends 
ProducerConsumerBase {
 
                        JsonParser parser = new JsonParser();
                        JsonObject element = 
parser.parse(commandData).getAsJsonObject();
+                       log.info("Have log of {}", element);
                        long expiryTimeInMillis = 
Long.parseLong(element.get("expiryTime").getAsString());
                        long currentTimeInMillis = System.currentTimeMillis();
                        if (expiryTimeInMillis < currentTimeInMillis) {
+                               log.warn("Auth failed due to timeout");
                                throw new 
AuthenticationException("Authentication data has been expired");
                        }
                        return element.get("entityType").getAsString();
@@ -199,10 +202,10 @@ public class ProxyAuthenticationTest extends 
ProducerConsumerBase {
                String namespaceName = "my-property/my-ns";
                String topicName = "persistent://my-property/my-ns/my-topic1";
                String subscriptionName = "my-subscriber-name";
-               // expires after 6 seconds
-               String clientAuthParams = "entityType:client,expiryTime:" + 
(System.currentTimeMillis() + 6 * 1000);
-               // expires after 3 seconds
-               String proxyAuthParams = "entityType:proxy,expiryTime:" + 
(System.currentTimeMillis() + 3 * 1000);
+               // expires after 60 seconds
+               String clientAuthParams = "entityType:client,expiryTime:" + 
(System.currentTimeMillis() + 60 * 1000);
+               // expires after 60 seconds
+               String proxyAuthParams = "entityType:proxy,expiryTime:" + 
(System.currentTimeMillis() + 60 * 1000);
 
                admin.namespaces().grantPermissionOnNamespace(namespaceName, 
"proxy",
                                Sets.newHashSet(AuthAction.consume, 
AuthAction.produce));
@@ -231,16 +234,18 @@ public class ProxyAuthenticationTest extends 
ProducerConsumerBase {
                proxyService.start();
                final String proxyServiceUrl = proxyService.getServiceUrl();
 
-               // Step 3: Pass correct client params
+               // Step 3: Pass correct client params and use multiple 
connections
                @Cleanup
-               PulsarClient proxyClient = createPulsarClient(proxyServiceUrl, 
clientAuthParams, 1);
+               PulsarClient proxyClient = createPulsarClient(proxyServiceUrl, 
clientAuthParams, 3);
                proxyClient.newProducer(Schema.BYTES).topic(topicName).create();
-               // Sleep for 4 seconds - wait for proxy auth params to expire
-               Thread.sleep(4 * 1000);
                proxyClient.newProducer(Schema.BYTES).topic(topicName).create();
-               // Sleep for 3 seconds - wait for client auth parans to expire
-               Thread.sleep(3 * 1000);
                proxyClient.newProducer(Schema.BYTES).topic(topicName).create();
+
+               // Step 4: Ensure that all client contexts share the same auth 
provider
+               Assert.assertTrue(proxyService.getClientCnxs().size() >= 3, 
"expect at least 3 clients");
+               proxyService.getClientCnxs().stream().forEach((cnx) -> {
+                       Assert.assertSame(cnx.authenticationProvider, 
proxyService.getAuthenticationService().getAuthenticationProvider("BasicAuthentication"));
+               });
        }
 
        private void updateAdminClient() throws PulsarClientException {

Reply via email to