This is an automated email from the ASF dual-hosted git repository.

lhotari pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/branch-3.0 by this push:
     new db95281134a [fix][proxy] Close client connection immediately when 
credentials expire and forwardAuthorizationCredentials is disabled (#25179)
db95281134a is described below

commit db95281134a7d026343d7f57429f03a540ea91cd
Author: Zixuan Liu <[email protected]>
AuthorDate: Sat Jan 24 11:20:40 2026 +0800

    [fix][proxy] Close client connection immediately when credentials expire 
and forwardAuthorizationCredentials is disabled (#25179)
    
    (cherry picked from commit 334847073e2c926eac5c486a5cfda1d1dd42634a)
---
 .../pulsar/proxy/server/ProxyConnection.java       |  55 +-
 .../proxy/server/ProxyAuthenticationTest.java      | 562 +++++++++++++--------
 .../pulsar/proxy/server/ProxyRefreshAuthTest.java  |  48 +-
 3 files changed, 411 insertions(+), 254 deletions(-)

diff --git 
a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConnection.java
 
b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConnection.java
index 354d145a954..54df98abb8c 100644
--- 
a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConnection.java
+++ 
b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConnection.java
@@ -146,7 +146,12 @@ public class ProxyConnection extends PulsarHandler {
 
         Closing,
 
-        Closed,
+        Closed;
+
+        boolean isAuthenticatedState() {
+            return this == ProxyLookupRequests
+                    || this == ProxyConnectionToBroker;
+        }
     }
 
     ConnectionPool getConnectionPool() {
@@ -397,15 +402,7 @@ public class ProxyConnection extends PulsarHandler {
 
             state = State.ProxyLookupRequests;
             lookupProxyHandler = service.newLookupProxyHandler(this);
-            if (service.getConfiguration().isAuthenticationEnabled()
-                    && 
service.getConfiguration().getAuthenticationRefreshCheckSeconds() > 0) {
-                authRefreshTask = ctx.executor().scheduleAtFixedRate(
-                        Runnables.catchingAndLoggingThrowables(
-                                
this::refreshAuthenticationCredentialsAndCloseIfTooExpired),
-                        
service.getConfiguration().getAuthenticationRefreshCheckSeconds(),
-                        
service.getConfiguration().getAuthenticationRefreshCheckSeconds(),
-                        TimeUnit.SECONDS);
-            }
+            startAuthRefreshTaskIfNotStarted();
             final ByteBuf msg = 
Commands.newConnected(protocolVersionToAdvertise, false);
             writeAndFlush(msg);
         }
@@ -421,6 +418,10 @@ public class ProxyConnection extends PulsarHandler {
             final ByteBuf msg = 
Commands.newConnected(connected.getProtocolVersion(), maxMessageSize,
                     connected.hasFeatureFlags() && 
connected.getFeatureFlags().isSupportsTopicWatchers());
             writeAndFlush(msg);
+            // Start auth refresh task only if we are not forwarding 
authorization credentials
+            if 
(!service.getConfiguration().isForwardAuthorizationCredentials()) {
+                startAuthRefreshTaskIfNotStarted();
+            }
         } else {
             LOG.warn("[{}] Channel is {}. ProxyConnection is in {}. "
                             + "Closing connection to broker '{}'.",
@@ -502,16 +503,44 @@ public class ProxyConnection extends PulsarHandler {
         }
     }
 
+    private void startAuthRefreshTaskIfNotStarted() {
+        if (service.getConfiguration().isAuthenticationEnabled()
+                && 
service.getConfiguration().getAuthenticationRefreshCheckSeconds() > 0
+                && authRefreshTask == null) {
+            authRefreshTask = ctx.executor().scheduleAtFixedRate(
+                    Runnables.catchingAndLoggingThrowables(
+                            
this::refreshAuthenticationCredentialsAndCloseIfTooExpired),
+                    
service.getConfiguration().getAuthenticationRefreshCheckSeconds(),
+                    
service.getConfiguration().getAuthenticationRefreshCheckSeconds(),
+                    TimeUnit.SECONDS);
+        }
+    }
+
     private void refreshAuthenticationCredentialsAndCloseIfTooExpired() {
         assert ctx.executor().inEventLoop();
-        if (state != State.ProxyLookupRequests) {
-            // Happens when an exception is thrown that causes this connection 
to close.
+
+        // Only check expiration in authenticated states
+        if (!state.isAuthenticatedState()) {
             return;
-        } else if (!authState.isExpired()) {
+        }
+
+        if (!authState.isExpired()) {
             // Credentials are still valid. Nothing to do at this point
             return;
         }
 
+        // If we are not forwarding authorization credentials to the broker, 
the broker cannot
+        // refresh the client's credentials. In this case, we must close the 
connection immediately
+        // when credentials expire.
+        if (!service.getConfiguration().isForwardAuthorizationCredentials()) {
+            if (LOG.isDebugEnabled()) {
+                LOG.debug("[{}] Closing connection because client credentials 
have expired and "
+                        + "forwardAuthorizationCredentials is disabled (broker 
cannot refresh)", remoteAddress);
+            }
+            ctx.close();
+            return;
+        }
+
         if (System.nanoTime() - authChallengeSentTime
                 > 
TimeUnit.SECONDS.toNanos(service.getConfiguration().getAuthenticationRefreshCheckSeconds()))
 {
             LOG.warn("[{}] Closing connection after timeout on refreshing auth 
credentials", remoteAddress);
diff --git 
a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyAuthenticationTest.java
 
b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyAuthenticationTest.java
index 3207c2c3d6a..b255500c463 100644
--- 
a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyAuthenticationTest.java
+++ 
b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyAuthenticationTest.java
@@ -18,13 +18,15 @@
  */
 package org.apache.pulsar.proxy.server;
 
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
 import static org.mockito.Mockito.spy;
-
 import com.google.common.collect.Sets;
 import com.google.gson.JsonObject;
 import com.google.gson.JsonParser;
-
 import java.io.IOException;
+import java.net.SocketAddress;
+import java.nio.charset.StandardCharsets;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Map;
@@ -32,14 +34,16 @@ import java.util.Map.Entry;
 import java.util.Optional;
 import java.util.Set;
 import java.util.concurrent.CompletableFuture;
-
+import java.util.concurrent.TimeUnit;
 import javax.naming.AuthenticationException;
-
+import javax.net.ssl.SSLSession;
 import lombok.Cleanup;
 import org.apache.pulsar.broker.ServiceConfiguration;
+import org.apache.pulsar.broker.authentication.AuthenticationDataCommand;
 import org.apache.pulsar.broker.authentication.AuthenticationDataSource;
 import org.apache.pulsar.broker.authentication.AuthenticationProvider;
 import org.apache.pulsar.broker.authentication.AuthenticationService;
+import org.apache.pulsar.broker.authentication.AuthenticationState;
 import org.apache.pulsar.client.admin.PulsarAdmin;
 import org.apache.pulsar.client.api.Authentication;
 import org.apache.pulsar.client.api.AuthenticationDataProvider;
@@ -48,8 +52,11 @@ import org.apache.pulsar.client.api.ProducerConsumerBase;
 import org.apache.pulsar.client.api.PulsarClient;
 import org.apache.pulsar.client.api.PulsarClientException;
 import org.apache.pulsar.client.api.Schema;
+import org.apache.pulsar.client.impl.ProducerImpl;
+import org.apache.pulsar.common.api.AuthData;
 import org.apache.pulsar.common.configuration.PulsarConfigurationLoader;
 import org.apache.pulsar.common.policies.data.AuthAction;
+import org.awaitility.Awaitility;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.testng.Assert;
@@ -58,214 +65,347 @@ import org.testng.annotations.BeforeMethod;
 import org.testng.annotations.Test;
 
 public class ProxyAuthenticationTest extends ProducerConsumerBase {
-       private static final Logger log = 
LoggerFactory.getLogger(ProxyAuthenticationTest.class);
-
-       public static class BasicAuthenticationData implements 
AuthenticationDataProvider {
-               private final String authParam;
-
-               public BasicAuthenticationData(String authParam) {
-                       this.authParam = authParam;
-               }
-
-               public boolean hasDataFromCommand() {
-                       return true;
-               }
-
-               public String getCommandData() {
-                       return authParam;
-               }
-
-               public boolean hasDataForHttp() {
-                       return true;
-               }
-
-               @Override
-               public Set<Entry<String, String>> getHttpHeaders() {
-                       Map<String, String> headers = new HashMap<>();
-                       headers.put("BasicAuthentication", authParam);
-                       return headers.entrySet();
-               }
-       }
-
-       public static class BasicAuthentication implements Authentication {
-
-               private String authParam;
-
-               @Override
-               public void close() throws IOException {
-                       // noop
-               }
-
-               @Override
-               public String getAuthMethodName() {
-                       return "BasicAuthentication";
-               }
-
-               @Override
-               public AuthenticationDataProvider getAuthData() throws 
PulsarClientException {
-                       try {
-                               return new BasicAuthenticationData(authParam);
-                       } catch (Exception e) {
-                               throw new PulsarClientException(e);
-                       }
-               }
-
-               @Override
-               public void configure(Map<String, String> authParams) {
-                       this.authParam = String.format("{\"entityType\": 
\"%s\", \"expiryTime\": \"%s\"}",
-                                       authParams.get("entityType"), 
authParams.get("expiryTime"));
-               }
-
-               @Override
-               public void start() throws PulsarClientException {
-                       // noop
-               }
-       }
-
-       public static class BasicAuthenticationProvider implements 
AuthenticationProvider {
-
-               @Override
-               public void close() throws IOException {
-               }
-
-               @Override
-               public void initialize(ServiceConfiguration config) throws 
IOException {
-               }
-
-               @Override
-               public String getAuthMethodName() {
-                       return "BasicAuthentication";
-               }
-
-               @Override
-               public CompletableFuture<String> 
authenticateAsync(AuthenticationDataSource authData) {
-                       String commandData = null;
-                       if (authData.hasDataFromCommand()) {
-                               commandData = authData.getCommandData();
-                       } else if (authData.hasDataFromHttp()) {
-                               commandData = 
authData.getHttpHeader("BasicAuthentication");
-                       }
-
-                       JsonObject element = 
JsonParser.parseString(commandData).getAsJsonObject();
-                       log.info("Have log of {}", element);
-                       long expiryTimeInMillis = 
Long.parseLong(element.get("expiryTime").getAsString());
-                       long currentTimeInMillis = System.currentTimeMillis();
-                       if (expiryTimeInMillis < currentTimeInMillis) {
-                               log.warn("Auth failed due to timeout");
-                               return CompletableFuture
-                                               .failedFuture(new 
AuthenticationException("Authentication data has been expired"));
-                       }
-                       final String result = 
element.get("entityType").getAsString();
-                       // Run in another thread to attempt to test the async 
logic
-                       return CompletableFuture.supplyAsync(() -> result);
-               }
-       }
-
-       @BeforeMethod
-       @Override
-       protected void setup() throws Exception {
-               conf.setAuthenticationEnabled(true);
-               conf.setAuthorizationEnabled(true);
-               
conf.setBrokerClientAuthenticationPlugin(BasicAuthentication.class.getName());
-               // Expires after an hour
-               conf.setBrokerClientAuthenticationParameters(
-                               "entityType:admin,expiryTime:" + 
(System.currentTimeMillis() + 3600 * 1000));
-
-               Set<String> superUserRoles = new HashSet<>();
-               superUserRoles.add("admin");
-               conf.setSuperUserRoles(superUserRoles);
-
-               Set<String> providers = new HashSet<>();
-               providers.add(BasicAuthenticationProvider.class.getName());
-               conf.setAuthenticationProviders(providers);
-
-               conf.setClusterName("test");
-               Set<String> proxyRoles = new HashSet<>();
-               proxyRoles.add("proxy");
-               conf.setProxyRoles(proxyRoles);
+    private static final Logger log = 
LoggerFactory.getLogger(ProxyAuthenticationTest.class);
+    private static final String CLUSTER_NAME = "test";
+
+    public static class BasicAuthenticationData implements 
AuthenticationDataProvider {
+        private final String authParam;
+
+        public BasicAuthenticationData(String authParam) {
+            this.authParam = authParam;
+        }
+
+        public boolean hasDataFromCommand() {
+            return true;
+        }
+
+        public String getCommandData() {
+            return authParam;
+        }
+
+        public boolean hasDataForHttp() {
+            return true;
+        }
+
+        @Override
+        public Set<Entry<String, String>> getHttpHeaders() {
+            Map<String, String> headers = new HashMap<>();
+            headers.put("BasicAuthentication", authParam);
+            headers.put("X-Pulsar-Auth-Method-Name", "BasicAuthentication");
+            return headers.entrySet();
+        }
+    }
+
+    public static class BasicAuthentication implements Authentication {
+
+        private String authParam;
+
+        @Override
+        public void close() throws IOException {
+            // noop
+        }
+
+        @Override
+        public String getAuthMethodName() {
+            return "BasicAuthentication";
+        }
+
+        @Override
+        public AuthenticationDataProvider getAuthData() throws 
PulsarClientException {
+            try {
+                return new BasicAuthenticationData(authParam);
+            } catch (Exception e) {
+                throw new PulsarClientException(e);
+            }
+        }
+
+        @Override
+        public void configure(Map<String, String> authParams) {
+            this.authParam = String.format("{\"entityType\": \"%s\", 
\"expiryTime\": \"%s\"}",
+                    authParams.get("entityType"), 
authParams.get("expiryTime"));
+        }
+
+        @Override
+        public void start() throws PulsarClientException {
+            // noop
+        }
+    }
+
+    public static class BasicAuthenticationState implements 
AuthenticationState {
+        private final long expiryTimeInMillis;
+        private final String authRole;
+        private final AuthenticationDataSource authenticationDataSource;
+
+        private static boolean isExpired(long expiryTimeInMillis) {
+            return System.currentTimeMillis() > expiryTimeInMillis;
+        }
+
+        private static String[] parseAuthData(String commandData) {
+            JsonObject element = 
JsonParser.parseString(commandData).getAsJsonObject();
+            long expiryTimeInMillis = 
Long.parseLong(element.get("expiryTime").getAsString());
+            if (isExpired(expiryTimeInMillis)) {
+                throw new IllegalArgumentException("Credentials have expired");
+            }
+            String role = element.get("entityType").getAsString();
+            return new String[]{role, String.valueOf(expiryTimeInMillis)};
+        }
+
+        public BasicAuthenticationState(AuthenticationDataSource authData) {
+            this(authData.hasDataFromCommand() ? authData.getCommandData()
+                    : authData.getHttpHeader("BasicAuthentication"));
+        }
+
+        public BasicAuthenticationState(AuthData authData) {
+            this(new String(authData.getBytes(), StandardCharsets.UTF_8));
+        }
+
+        private BasicAuthenticationState(String commandData) {
+            String[] parsed = parseAuthData(commandData);
+            this.authRole = parsed[0];
+            this.expiryTimeInMillis = Long.parseLong(parsed[1]);
+            this.authenticationDataSource = new 
AuthenticationDataCommand(commandData, null, null);
+        }
+
+        @Override
+        public String getAuthRole() {
+            return authRole;
+        }
+
+        @Override
+        public AuthData authenticate(AuthData authData) throws 
AuthenticationException {
+            return null; // Authentication complete
+        }
+
+        @Override
+        public CompletableFuture<AuthData> authenticateAsync(AuthData 
authData) {
+            return CompletableFuture.completedFuture(null); // Authentication 
complete
+        }
+
+        @Override
+        public AuthenticationDataSource getAuthDataSource() {
+            return authenticationDataSource;
+        }
+
+        @Override
+        public boolean isComplete() {
+            return authRole != null;
+        }
+
+        @Override
+        public boolean isExpired() {
+            return isExpired(expiryTimeInMillis);
+        }
+    }
+
+    public static class BasicAuthenticationProvider implements 
AuthenticationProvider {
+
+        @Override
+        public void close() throws IOException {
+        }
+
+        @Override
+        public void initialize(ServiceConfiguration config) throws IOException 
{
+        }
+
+        @Override
+        public String getAuthMethodName() {
+            return "BasicAuthentication";
+        }
+
+        @Override
+        public AuthenticationState newAuthState(AuthData authData, 
SocketAddress remoteAddress, SSLSession sslSession) {
+            return new BasicAuthenticationState(authData);
+        }
+
+        @Override
+        public CompletableFuture<String> 
authenticateAsync(AuthenticationDataSource authData) {
+            BasicAuthenticationState basicAuthenticationState = new 
BasicAuthenticationState(authData);
+            return 
CompletableFuture.supplyAsync(basicAuthenticationState::getAuthRole);
+        }
+    }
+
+    @BeforeMethod
+    @Override
+    protected void setup() throws Exception {
+        conf.setAuthenticationEnabled(true);
+        conf.setAuthorizationEnabled(true);
+        
conf.setBrokerClientAuthenticationPlugin(BasicAuthentication.class.getName());
+        // Expires after an hour
+        conf.setBrokerClientAuthenticationParameters(
+                "entityType:admin,expiryTime:" + (System.currentTimeMillis() + 
3600 * 1000));
+
+        Set<String> superUserRoles = new HashSet<>();
+        superUserRoles.add("admin");
+        conf.setSuperUserRoles(superUserRoles);
+
+        Set<String> providers = new HashSet<>();
+        providers.add(BasicAuthenticationProvider.class.getName());
+        conf.setAuthenticationProviders(providers);
+
+        conf.setClusterName(CLUSTER_NAME);
+        Set<String> proxyRoles = new HashSet<>();
+        proxyRoles.add("proxy");
+        conf.setProxyRoles(proxyRoles);
         conf.setAuthenticateOriginalAuthData(true);
-               super.init();
-
-               updateAdminClient();
-               producerBaseSetup();
-       }
-
-       @Override
-       @AfterMethod(alwaysRun = true)
-       protected void cleanup() throws Exception {
-               super.internalCleanup();
-       }
-
-       @Test
-       void testAuthentication() throws Exception {
-               log.info("-- Starting {} test --", methodName);
-
-               // Step 1: Create Admin Client
-               updateAdminClient();
-               // create a client which connects to proxy and pass authData
-               String namespaceName = "my-property/my-ns";
-               String topicName = "persistent://my-property/my-ns/my-topic1";
-               String subscriptionName = "my-subscriber-name";
-               // expires after 60 seconds
-               String clientAuthParams = "entityType:client,expiryTime:" + 
(System.currentTimeMillis() + 60 * 1000);
-               // expires after 60 seconds
-               String proxyAuthParams = "entityType:proxy,expiryTime:" + 
(System.currentTimeMillis() + 60 * 1000);
-
-               admin.namespaces().grantPermissionOnNamespace(namespaceName, 
"proxy",
-                               Sets.newHashSet(AuthAction.consume, 
AuthAction.produce));
-               admin.namespaces().grantPermissionOnNamespace(namespaceName, 
"client",
-                               Sets.newHashSet(AuthAction.consume, 
AuthAction.produce));
-
-               // Step 2: Try to use proxy Client as a normal Client - expect 
exception
-               ProxyConfiguration proxyConfig = new ProxyConfiguration();
-               proxyConfig.setAuthenticationEnabled(true);
-               proxyConfig.setServicePort(Optional.of(0));
-               proxyConfig.setBrokerProxyAllowedTargetPorts("*");
-               proxyConfig.setWebServicePort(Optional.of(0));
-               proxyConfig.setBrokerServiceURL(pulsar.getBrokerServiceUrl());
-
-               
proxyConfig.setBrokerClientAuthenticationPlugin(BasicAuthentication.class.getName());
-               
proxyConfig.setBrokerClientAuthenticationParameters(proxyAuthParams);
-
-               Set<String> providers = new HashSet<>();
-               providers.add(BasicAuthenticationProvider.class.getName());
-               proxyConfig.setAuthenticationProviders(providers);
-               proxyConfig.setForwardAuthorizationCredentials(true);
+        super.init();
+
+        updateAdminClient();
+        producerBaseSetup();
+    }
+
+    @Override
+    @AfterMethod(alwaysRun = true)
+    protected void cleanup() throws Exception {
+        super.internalCleanup();
+    }
+
+    @Test
+    void testAuthentication() throws Exception {
+        log.info("-- Starting {} test --", methodName);
+
+        // Step 1: Create Admin Client
+        updateAdminClient();
+        // create a client which connects to proxy and pass authData
+        String namespaceName = "my-property/my-ns";
+        String topicName = "persistent://my-property/my-ns/my-topic1";
+        String subscriptionName = "my-subscriber-name";
+        // expires after 60 seconds
+        String clientAuthParams = "entityType:client,expiryTime:" + 
(System.currentTimeMillis() + 60 * 1000);
+        // expires after 60 seconds
+        String proxyAuthParams = "entityType:proxy,expiryTime:" + 
(System.currentTimeMillis() + 60 * 1000);
+
+        admin.namespaces().grantPermissionOnNamespace(namespaceName, "proxy",
+                Sets.newHashSet(AuthAction.consume, AuthAction.produce));
+        admin.namespaces().grantPermissionOnNamespace(namespaceName, "client",
+                Sets.newHashSet(AuthAction.consume, AuthAction.produce));
+
+        // Step 2: Try to use proxy Client as a normal Client - expect 
exception
+        ProxyConfiguration proxyConfig = new ProxyConfiguration();
+        proxyConfig.setAuthenticationEnabled(true);
+        proxyConfig.setServicePort(Optional.of(0));
+        proxyConfig.setBrokerProxyAllowedTargetPorts("*");
+        proxyConfig.setWebServicePort(Optional.of(0));
+        proxyConfig.setBrokerServiceURL(pulsar.getBrokerServiceUrl());
+        proxyConfig.setClusterName(CLUSTER_NAME);
+
+        
proxyConfig.setBrokerClientAuthenticationPlugin(BasicAuthentication.class.getName());
+        proxyConfig.setBrokerClientAuthenticationParameters(proxyAuthParams);
+
+        Set<String> providers = new HashSet<>();
+        providers.add(BasicAuthenticationProvider.class.getName());
+        proxyConfig.setAuthenticationProviders(providers);
+        proxyConfig.setForwardAuthorizationCredentials(true);
                 AuthenticationService authenticationService = new 
AuthenticationService(
                         PulsarConfigurationLoader.convertFrom(proxyConfig));
-               @Cleanup
-               final Authentication proxyClientAuthentication = 
AuthenticationFactory.create(proxyConfig.getBrokerClientAuthenticationPlugin(),
-                               
proxyConfig.getBrokerClientAuthenticationParameters());
-               proxyClientAuthentication.start();
-               @Cleanup
-               ProxyService proxyService = new ProxyService(proxyConfig, 
authenticationService, proxyClientAuthentication);
-
-               proxyService.start();
-               final String proxyServiceUrl = proxyService.getServiceUrl();
-
-               // Step 3: Pass correct client params and use multiple 
connections
-               @Cleanup
-               PulsarClient proxyClient = createPulsarClient(proxyServiceUrl, 
clientAuthParams, 3);
-               proxyClient.newProducer(Schema.BYTES).topic(topicName).create();
-               proxyClient.newProducer(Schema.BYTES).topic(topicName).create();
-               proxyClient.newProducer(Schema.BYTES).topic(topicName).create();
-
-               // Step 4: Ensure that all client contexts share the same auth 
provider
-               Assert.assertTrue(proxyService.getClientCnxs().size() >= 3, 
"expect at least 3 clients");
-               proxyService.getClientCnxs().stream().forEach((cnx) -> {
-                       Assert.assertSame(cnx.authenticationProvider, 
proxyService.getAuthenticationService().getAuthenticationProvider("BasicAuthentication"));
-               });
-       }
-
-       private void updateAdminClient() throws PulsarClientException {
-               // Expires after an hour
-               String adminAuthParams = "entityType:admin,expiryTime:" + 
(System.currentTimeMillis() + 3600 * 1000);
-               admin = 
spy(PulsarAdmin.builder().serviceHttpUrl(brokerUrl.toString())
-                               
.authentication(BasicAuthentication.class.getName(), adminAuthParams).build());
-       }
-
-       private PulsarClient createPulsarClient(String proxyServiceUrl, String 
authParams, int numberOfConnections) throws PulsarClientException {
-               return PulsarClient.builder().serviceUrl(proxyServiceUrl)
-                               
.authentication(BasicAuthentication.class.getName(), 
authParams).connectionsPerBroker(numberOfConnections).build();
-       }
+        @Cleanup
+        final Authentication proxyClientAuthentication =
+                
AuthenticationFactory.create(proxyConfig.getBrokerClientAuthenticationPlugin(),
+                proxyConfig.getBrokerClientAuthenticationParameters());
+        proxyClientAuthentication.start();
+        @Cleanup
+        ProxyService proxyService = new ProxyService(proxyConfig, 
authenticationService, proxyClientAuthentication);
+
+        proxyService.start();
+        final String proxyServiceUrl = proxyService.getServiceUrl();
+
+        // Step 3: Pass correct client params and use multiple connections
+        @Cleanup
+        PulsarClient proxyClient = createPulsarClient(proxyServiceUrl, 
clientAuthParams, 3);
+        proxyClient.newProducer(Schema.BYTES).topic(topicName).create();
+        proxyClient.newProducer(Schema.BYTES).topic(topicName).create();
+        proxyClient.newProducer(Schema.BYTES).topic(topicName).create();
+
+        // Step 4: Ensure that all client contexts share the same auth provider
+        Assert.assertTrue(proxyService.getClientCnxs().size() >= 3, "expect at 
least 3 clients");
+        proxyService.getClientCnxs().stream().forEach((cnx) -> {
+            Assert.assertSame(cnx.authenticationProvider,
+                    
proxyService.getAuthenticationService().getAuthenticationProvider("BasicAuthentication"));
+        });
+    }
+
+    private void updateAdminClient() throws PulsarClientException {
+        // Expires after an hour
+        String adminAuthParams = "entityType:admin,expiryTime:" + 
(System.currentTimeMillis() + 3600 * 1000);
+        admin.close();
+        admin = spy(PulsarAdmin.builder().serviceHttpUrl(brokerUrl.toString())
+                .authentication(BasicAuthentication.class.getName(), 
adminAuthParams).build());
+    }
+
+    private PulsarClient createPulsarClient(String proxyServiceUrl, String 
authParams, int numberOfConnections)
+            throws PulsarClientException {
+        return PulsarClient.builder().serviceUrl(proxyServiceUrl)
+                .authentication(BasicAuthentication.class.getName(), 
authParams)
+                .connectionsPerBroker(numberOfConnections).build();
+    }
+
+    @Test
+    void testClientDisconnectWhenCredentialsExpireWithoutForwardAuth() throws 
Exception {
+        log.info("-- Starting {} test --", methodName);
+
+        String namespaceName = "my-property/my-ns";
+        String topicName = "persistent://my-property/my-ns/my-topic1";
+
+        admin.namespaces().grantPermissionOnNamespace(namespaceName, "proxy",
+                Sets.newHashSet(AuthAction.consume, AuthAction.produce));
+        admin.namespaces().grantPermissionOnNamespace(namespaceName, "client",
+                Sets.newHashSet(AuthAction.consume, AuthAction.produce));
+
+        // Important: When forwardAuthorizationCredentials=false, broker 
should not authenticate original auth data
+        // because the proxy doesn't forward it. Set 
authenticateOriginalAuthData=false to match this behavior.
+        conf.setAuthenticateOriginalAuthData(false);
+
+        ProxyConfiguration proxyConfig = new ProxyConfiguration();
+        proxyConfig.setAuthenticationEnabled(true);
+        proxyConfig.setAuthenticationRefreshCheckSeconds(2); // Check every 2 
seconds
+        proxyConfig.setServicePort(Optional.of(0));
+        proxyConfig.setBrokerProxyAllowedTargetPorts("*");
+        proxyConfig.setWebServicePort(Optional.of(0));
+        proxyConfig.setBrokerServiceURL(pulsar.getBrokerServiceUrl());
+        proxyConfig.setClusterName(CLUSTER_NAME);
+
+        // Proxy auth with long expiry
+        String proxyAuthParams = "entityType:proxy,expiryTime:" + 
(System.currentTimeMillis() + 3600 * 1000);
+        
proxyConfig.setBrokerClientAuthenticationPlugin(BasicAuthentication.class.getName());
+        proxyConfig.setBrokerClientAuthenticationParameters(proxyAuthParams);
+
+        Set<String> providers = new HashSet<>();
+        providers.add(BasicAuthenticationProvider.class.getName());
+        proxyConfig.setAuthenticationProviders(providers);
+        proxyConfig.setForwardAuthorizationCredentials(false);
+
+        @Cleanup
+        AuthenticationService authenticationService = new 
AuthenticationService(
+                PulsarConfigurationLoader.convertFrom(proxyConfig));
+        @Cleanup
+        final Authentication proxyClientAuthentication =
+                
AuthenticationFactory.create(proxyConfig.getBrokerClientAuthenticationPlugin(),
+                proxyConfig.getBrokerClientAuthenticationParameters());
+        proxyClientAuthentication.start();
+        @Cleanup
+        ProxyService proxyService = new ProxyService(proxyConfig, 
authenticationService, proxyClientAuthentication);
+        proxyService.start();
+        final String proxyServiceUrl = proxyService.getServiceUrl();
+
+        // Create client with credentials that will expire in 3 seconds
+        long clientExpireTime = System.currentTimeMillis() + 3 * 1000;
+        String clientAuthParams = "entityType:client,expiryTime:" + 
clientExpireTime;
+
+        @Cleanup
+        PulsarClient proxyClient = createPulsarClient(proxyServiceUrl, 
clientAuthParams, 1);
+
+        @Cleanup
+        var producer =
+                
proxyClient.newProducer(Schema.BYTES).topic(topicName).sendTimeout(5, 
TimeUnit.SECONDS).create();
+        producer.send("test message".getBytes());
+
+        Awaitility.await().untilAsserted(() -> {
+            assertThatThrownBy(() -> producer.send("test message after 
expiry".getBytes()))
+                    
.isExactlyInstanceOf(PulsarClientException.TimeoutException.class);
+        });
+
+        if (producer instanceof ProducerImpl<byte[]> producerImpl) {
+            long lastDisconnectedTimestamp = 
producerImpl.getLastDisconnectedTimestamp();
+            
assertThat(lastDisconnectedTimestamp).isGreaterThan(clientExpireTime);
+        }
+    }
 }
diff --git 
a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyRefreshAuthTest.java
 
b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyRefreshAuthTest.java
index b058e4af830..56a6c64395c 100644
--- 
a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyRefreshAuthTest.java
+++ 
b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyRefreshAuthTest.java
@@ -177,37 +177,25 @@ public class ProxyRefreshAuthTest extends 
ProducerConsumerBase {
 
         PulsarClientImpl pulsarClientImpl = (PulsarClientImpl) pulsarClient;
         pulsarClient.getPartitionsForTopic(topic).get();
-        Set<CompletableFuture<ClientCnx>> connections = 
pulsarClientImpl.getCnxPool().getConnections();
 
-        Awaitility.await().during(5, SECONDS).untilAsserted(() -> {
-            pulsarClient.getPartitionsForTopic(topic).get();
-            assertTrue(connections.stream().allMatch(n -> {
-                try {
-                    ClientCnx clientCnx = n.get();
-                    long timestamp = clientCnx.getLastDisconnectedTimestamp();
-                    return timestamp == 0;
-                } catch (Exception e) {
-                    throw new RuntimeException(e);
-                }
-            }));
-        });
+        // Verify initial connection state
+        Set<CompletableFuture<ClientCnx>> connections = 
pulsarClientImpl.getCnxPool().getConnections();
 
-        // Force all connections from proxy to broker to close and therefore 
require the proxy to re-authenticate with
-        // the broker. (The client doesn't lose this connection.)
-        restartBroker();
-
-        // Rerun assertion to ensure that it still works
-        Awaitility.await().during(5, SECONDS).untilAsserted(() -> {
-            pulsarClient.getPartitionsForTopic(topic).get();
-            assertTrue(connections.stream().allMatch(n -> {
-                try {
-                    ClientCnx clientCnx = n.get();
-                    long timestamp = clientCnx.getLastDisconnectedTimestamp();
-                    return timestamp == 0;
-                } catch (Exception e) {
-                    throw new RuntimeException(e);
-                }
-            }));
-        });
+        Awaitility.await()
+                .during(5, SECONDS)
+                .untilAsserted(() -> {
+                    for (CompletableFuture<ClientCnx> cf : connections) {
+                        try {
+                            ClientCnx clientCnx = cf.get();
+                            long timestamp = 
clientCnx.getLastDisconnectedTimestamp();
+                            // If forwardAuthData is false, the broker cannot 
see the client's authentication data.
+                            // As a result, the broker cannot perform any 
refresh operations on the client's auth data.
+                            // Only the proxy has visibility of the client's 
connection state.
+                            assertTrue(forwardAuthData ? timestamp == 0 : 
timestamp > 0);
+                        } catch (Exception e) {
+                            throw new AssertionError("Failed to get connection 
state", e);
+                        }
+                    }
+                });
     }
 }

Reply via email to