This is an automated email from the ASF dual-hosted git repository.

zixuan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 334847073e2 [fix][proxy] Close client connection immediately when 
credentials expire and forwardAuthorizationCredentials is disabled (#25179)
334847073e2 is described below

commit 334847073e2c926eac5c486a5cfda1d1dd42634a
Author: Zixuan Liu <[email protected]>
AuthorDate: Sat Jan 24 11:20:40 2026 +0800

    [fix][proxy] Close client connection immediately when credentials expire 
and forwardAuthorizationCredentials is disabled (#25179)
---
 .../pulsar/proxy/server/ProxyConnection.java       |  55 +++++--
 .../proxy/server/ProxyAuthenticationTest.java      | 175 ++++++++++++++++++---
 .../pulsar/proxy/server/ProxyRefreshAuthTest.java  |  48 +++---
 3 files changed, 216 insertions(+), 62 deletions(-)

diff --git 
a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConnection.java
 
b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConnection.java
index e479b8ee622..cfe788e5229 100644
--- 
a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConnection.java
+++ 
b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConnection.java
@@ -149,7 +149,12 @@ public class ProxyConnection extends PulsarHandler {
 
         Closing,
 
-        Closed,
+        Closed;
+
+        boolean isAuthenticatedState() {
+            return this == ProxyLookupRequests
+                    || this == ProxyConnectionToBroker;
+        }
     }
 
     ConnectionPool getConnectionPool() {
@@ -412,15 +417,7 @@ public class ProxyConnection extends PulsarHandler {
 
             state = State.ProxyLookupRequests;
             lookupProxyHandler = service.newLookupProxyHandler(this);
-            if (service.getConfiguration().isAuthenticationEnabled()
-                    && 
service.getConfiguration().getAuthenticationRefreshCheckSeconds() > 0) {
-                authRefreshTask = ctx.executor().scheduleAtFixedRate(
-                        Runnables.catchingAndLoggingThrowables(
-                                
this::refreshAuthenticationCredentialsAndCloseIfTooExpired),
-                        
service.getConfiguration().getAuthenticationRefreshCheckSeconds(),
-                        
service.getConfiguration().getAuthenticationRefreshCheckSeconds(),
-                        TimeUnit.SECONDS);
-            }
+            startAuthRefreshTaskIfNotStarted();
             final ByteBuf msg = 
Commands.newConnected(protocolVersionToAdvertise, false);
             writeAndFlush(msg);
         }
@@ -436,6 +433,10 @@ public class ProxyConnection extends PulsarHandler {
             final ByteBuf msg = 
Commands.newConnected(connected.getProtocolVersion(), maxMessageSize,
                     connected.hasFeatureFlags() && 
connected.getFeatureFlags().isSupportsTopicWatchers());
             writeAndFlush(msg);
+            // Start auth refresh task only if we are not forwarding 
authorization credentials
+            if 
(!service.getConfiguration().isForwardAuthorizationCredentials()) {
+                startAuthRefreshTaskIfNotStarted();
+            }
         } else {
             LOG.warn("[{}] Channel is {}. ProxyConnection is in {}. "
                             + "Closing connection to broker '{}'.",
@@ -517,16 +518,44 @@ public class ProxyConnection extends PulsarHandler {
         }
     }
 
+    private void startAuthRefreshTaskIfNotStarted() {
+        if (service.getConfiguration().isAuthenticationEnabled()
+                && 
service.getConfiguration().getAuthenticationRefreshCheckSeconds() > 0
+                && authRefreshTask == null) {
+            authRefreshTask = ctx.executor().scheduleAtFixedRate(
+                    Runnables.catchingAndLoggingThrowables(
+                            
this::refreshAuthenticationCredentialsAndCloseIfTooExpired),
+                    
service.getConfiguration().getAuthenticationRefreshCheckSeconds(),
+                    
service.getConfiguration().getAuthenticationRefreshCheckSeconds(),
+                    TimeUnit.SECONDS);
+        }
+    }
+
     private void refreshAuthenticationCredentialsAndCloseIfTooExpired() {
         assert ctx.executor().inEventLoop();
-        if (state != State.ProxyLookupRequests) {
-            // Happens when an exception is thrown that causes this connection 
to close.
+
+        // Only check expiration in authenticated states
+        if (!state.isAuthenticatedState()) {
             return;
-        } else if (!authState.isExpired()) {
+        }
+
+        if (!authState.isExpired()) {
             // Credentials are still valid. Nothing to do at this point
             return;
         }
 
+        // If we are not forwarding authorization credentials to the broker, 
the broker cannot
+        // refresh the client's credentials. In this case, we must close the 
connection immediately
+        // when credentials expire.
+        if (!service.getConfiguration().isForwardAuthorizationCredentials()) {
+            if (LOG.isDebugEnabled()) {
+                LOG.debug("[{}] Closing connection because client credentials 
have expired and "
+                        + "forwardAuthorizationCredentials is disabled (broker 
cannot refresh)", remoteAddress);
+            }
+            ctx.close();
+            return;
+        }
+
         if (System.nanoTime() - authChallengeSentTime
                 > 
TimeUnit.SECONDS.toNanos(service.getConfiguration().getAuthenticationRefreshCheckSeconds()))
 {
             LOG.warn("[{}] Closing connection after timeout on refreshing auth 
credentials", remoteAddress);
diff --git 
a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyAuthenticationTest.java
 
b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyAuthenticationTest.java
index 6887e9ea234..04529629de7 100644
--- 
a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyAuthenticationTest.java
+++ 
b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyAuthenticationTest.java
@@ -18,11 +18,15 @@
  */
 package org.apache.pulsar.proxy.server;
 
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
 import static org.mockito.Mockito.spy;
 import com.google.common.collect.Sets;
 import com.google.gson.JsonObject;
 import com.google.gson.JsonParser;
 import java.io.IOException;
+import java.net.SocketAddress;
+import java.nio.charset.StandardCharsets;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Map;
@@ -30,12 +34,16 @@ import java.util.Map.Entry;
 import java.util.Optional;
 import java.util.Set;
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
 import javax.naming.AuthenticationException;
+import javax.net.ssl.SSLSession;
 import lombok.Cleanup;
 import org.apache.pulsar.broker.ServiceConfiguration;
+import org.apache.pulsar.broker.authentication.AuthenticationDataCommand;
 import org.apache.pulsar.broker.authentication.AuthenticationDataSource;
 import org.apache.pulsar.broker.authentication.AuthenticationProvider;
 import org.apache.pulsar.broker.authentication.AuthenticationService;
+import org.apache.pulsar.broker.authentication.AuthenticationState;
 import org.apache.pulsar.client.admin.PulsarAdmin;
 import org.apache.pulsar.client.api.Authentication;
 import org.apache.pulsar.client.api.AuthenticationDataProvider;
@@ -44,8 +52,11 @@ import org.apache.pulsar.client.api.ProducerConsumerBase;
 import org.apache.pulsar.client.api.PulsarClient;
 import org.apache.pulsar.client.api.PulsarClientException;
 import org.apache.pulsar.client.api.Schema;
+import org.apache.pulsar.client.impl.ProducerImpl;
+import org.apache.pulsar.common.api.AuthData;
 import org.apache.pulsar.common.configuration.PulsarConfigurationLoader;
 import org.apache.pulsar.common.policies.data.AuthAction;
+import org.awaitility.Awaitility;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.testng.Assert;
@@ -80,6 +91,7 @@ public class ProxyAuthenticationTest extends 
ProducerConsumerBase {
         public Set<Entry<String, String>> getHttpHeaders() {
             Map<String, String> headers = new HashMap<>();
             headers.put("BasicAuthentication", authParam);
+            headers.put("X-Pulsar-Auth-Method-Name", "BasicAuthentication");
             return headers.entrySet();
         }
     }
@@ -119,6 +131,72 @@ public class ProxyAuthenticationTest extends 
ProducerConsumerBase {
         }
     }
 
+    public static class BasicAuthenticationState implements 
AuthenticationState {
+        private final long expiryTimeInMillis;
+        private final String authRole;
+        private final AuthenticationDataSource authenticationDataSource;
+
+        private static boolean isExpired(long expiryTimeInMillis) {
+            return System.currentTimeMillis() > expiryTimeInMillis;
+        }
+
+        private static String[] parseAuthData(String commandData) {
+            JsonObject element = 
JsonParser.parseString(commandData).getAsJsonObject();
+            long expiryTimeInMillis = 
Long.parseLong(element.get("expiryTime").getAsString());
+            if (isExpired(expiryTimeInMillis)) {
+                throw new IllegalArgumentException("Credentials have expired");
+            }
+            String role = element.get("entityType").getAsString();
+            return new String[]{role, String.valueOf(expiryTimeInMillis)};
+        }
+
+        public BasicAuthenticationState(AuthenticationDataSource authData) {
+            this(authData.hasDataFromCommand() ? authData.getCommandData()
+                    : authData.getHttpHeader("BasicAuthentication"));
+        }
+
+        public BasicAuthenticationState(AuthData authData) {
+            this(new String(authData.getBytes(), StandardCharsets.UTF_8));
+        }
+
+        private BasicAuthenticationState(String commandData) {
+            String[] parsed = parseAuthData(commandData);
+            this.authRole = parsed[0];
+            this.expiryTimeInMillis = Long.parseLong(parsed[1]);
+            this.authenticationDataSource = new 
AuthenticationDataCommand(commandData, null, null);
+        }
+
+        @Override
+        public String getAuthRole() {
+            return authRole;
+        }
+
+        @Override
+        public AuthData authenticate(AuthData authData) throws 
AuthenticationException {
+            return null; // Authentication complete
+        }
+
+        @Override
+        public CompletableFuture<AuthData> authenticateAsync(AuthData 
authData) {
+            return CompletableFuture.completedFuture(null); // Authentication 
complete
+        }
+
+        @Override
+        public AuthenticationDataSource getAuthDataSource() {
+            return authenticationDataSource;
+        }
+
+        @Override
+        public boolean isComplete() {
+            return authRole != null;
+        }
+
+        @Override
+        public boolean isExpired() {
+            return isExpired(expiryTimeInMillis);
+        }
+    }
+
     public static class BasicAuthenticationProvider implements 
AuthenticationProvider {
 
         @Override
@@ -135,26 +213,14 @@ public class ProxyAuthenticationTest extends 
ProducerConsumerBase {
         }
 
         @Override
-        public CompletableFuture<String> 
authenticateAsync(AuthenticationDataSource authData) {
-            String commandData = null;
-            if (authData.hasDataFromCommand()) {
-                commandData = authData.getCommandData();
-            } else if (authData.hasDataFromHttp()) {
-                commandData = authData.getHttpHeader("BasicAuthentication");
-            }
+        public AuthenticationState newAuthState(AuthData authData, 
SocketAddress remoteAddress, SSLSession sslSession) {
+            return new BasicAuthenticationState(authData);
+        }
 
-            JsonObject element = 
JsonParser.parseString(commandData).getAsJsonObject();
-            log.info("Have log of {}", element);
-            long expiryTimeInMillis = 
Long.parseLong(element.get("expiryTime").getAsString());
-            long currentTimeInMillis = System.currentTimeMillis();
-            if (expiryTimeInMillis < currentTimeInMillis) {
-                log.warn("Auth failed due to timeout");
-                return CompletableFuture
-                        .failedFuture(new 
AuthenticationException("Authentication data has been expired"));
-            }
-            final String result = element.get("entityType").getAsString();
-            // Run in another thread to attempt to test the async logic
-            return CompletableFuture.supplyAsync(() -> result);
+        @Override
+        public CompletableFuture<String> 
authenticateAsync(AuthenticationDataSource authData) {
+            BasicAuthenticationState basicAuthenticationState = new 
BasicAuthenticationState(authData);
+            return 
CompletableFuture.supplyAsync(basicAuthenticationState::getAuthRole);
         }
     }
 
@@ -271,4 +337,75 @@ public class ProxyAuthenticationTest extends 
ProducerConsumerBase {
                 .authentication(BasicAuthentication.class.getName(), 
authParams)
                 .connectionsPerBroker(numberOfConnections).build();
     }
+
+    @Test
+    void testClientDisconnectWhenCredentialsExpireWithoutForwardAuth() throws 
Exception {
+        log.info("-- Starting {} test --", methodName);
+
+        String namespaceName = "my-property/my-ns";
+        String topicName = "persistent://my-property/my-ns/my-topic1";
+
+        admin.namespaces().grantPermissionOnNamespace(namespaceName, "proxy",
+                Sets.newHashSet(AuthAction.consume, AuthAction.produce));
+        admin.namespaces().grantPermissionOnNamespace(namespaceName, "client",
+                Sets.newHashSet(AuthAction.consume, AuthAction.produce));
+
+        // Important: When forwardAuthorizationCredentials=false, broker 
should not authenticate original auth data
+        // because the proxy doesn't forward it. Set 
authenticateOriginalAuthData=false to match this behavior.
+        conf.setAuthenticateOriginalAuthData(false);
+
+        ProxyConfiguration proxyConfig = new ProxyConfiguration();
+        proxyConfig.setAuthenticationEnabled(true);
+        proxyConfig.setAuthenticationRefreshCheckSeconds(2); // Check every 2 
seconds
+        proxyConfig.setServicePort(Optional.of(0));
+        proxyConfig.setBrokerProxyAllowedTargetPorts("*");
+        proxyConfig.setWebServicePort(Optional.of(0));
+        proxyConfig.setBrokerServiceURL(pulsar.getBrokerServiceUrl());
+        proxyConfig.setClusterName(CLUSTER_NAME);
+
+        // Proxy auth with long expiry
+        String proxyAuthParams = "entityType:proxy,expiryTime:" + 
(System.currentTimeMillis() + 3600 * 1000);
+        
proxyConfig.setBrokerClientAuthenticationPlugin(BasicAuthentication.class.getName());
+        proxyConfig.setBrokerClientAuthenticationParameters(proxyAuthParams);
+
+        Set<String> providers = new HashSet<>();
+        providers.add(BasicAuthenticationProvider.class.getName());
+        proxyConfig.setAuthenticationProviders(providers);
+        proxyConfig.setForwardAuthorizationCredentials(false);
+
+        @Cleanup
+        AuthenticationService authenticationService = new 
AuthenticationService(
+                PulsarConfigurationLoader.convertFrom(proxyConfig));
+        @Cleanup
+        final Authentication proxyClientAuthentication =
+                
AuthenticationFactory.create(proxyConfig.getBrokerClientAuthenticationPlugin(),
+                proxyConfig.getBrokerClientAuthenticationParameters());
+        proxyClientAuthentication.start();
+        @Cleanup
+        ProxyService proxyService = new ProxyService(proxyConfig, 
authenticationService, proxyClientAuthentication);
+        proxyService.start();
+        final String proxyServiceUrl = proxyService.getServiceUrl();
+
+        // Create client with credentials that will expire in 3 seconds
+        long clientExpireTime = System.currentTimeMillis() + 3 * 1000;
+        String clientAuthParams = "entityType:client,expiryTime:" + 
clientExpireTime;
+
+        @Cleanup
+        PulsarClient proxyClient = createPulsarClient(proxyServiceUrl, 
clientAuthParams, 1);
+
+        @Cleanup
+        var producer =
+                
proxyClient.newProducer(Schema.BYTES).topic(topicName).sendTimeout(5, 
TimeUnit.SECONDS).create();
+        producer.send("test message".getBytes());
+
+        Awaitility.await().untilAsserted(() -> {
+            assertThatThrownBy(() -> producer.send("test message after 
expiry".getBytes()))
+                    
.isExactlyInstanceOf(PulsarClientException.TimeoutException.class);
+        });
+
+        if (producer instanceof ProducerImpl<byte[]> producerImpl) {
+            long lastDisconnectedTimestamp = 
producerImpl.getLastDisconnectedTimestamp();
+            
assertThat(lastDisconnectedTimestamp).isGreaterThan(clientExpireTime);
+        }
+    }
 }
diff --git 
a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyRefreshAuthTest.java
 
b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyRefreshAuthTest.java
index f3ff1362fd8..7501eb9306f 100644
--- 
a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyRefreshAuthTest.java
+++ 
b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyRefreshAuthTest.java
@@ -179,37 +179,25 @@ public class ProxyRefreshAuthTest extends 
ProducerConsumerBase {
 
         PulsarClientImpl pulsarClientImpl = (PulsarClientImpl) pulsarClient;
         pulsarClient.getPartitionsForTopic(topic).get();
-        Set<CompletableFuture<ClientCnx>> connections = 
pulsarClientImpl.getCnxPool().getConnections();
 
-        Awaitility.await().during(5, SECONDS).untilAsserted(() -> {
-            pulsarClient.getPartitionsForTopic(topic).get();
-            assertTrue(connections.stream().allMatch(n -> {
-                try {
-                    ClientCnx clientCnx = n.get();
-                    long timestamp = clientCnx.getLastDisconnectedTimestamp();
-                    return timestamp == 0;
-                } catch (Exception e) {
-                    throw new RuntimeException(e);
-                }
-            }));
-        });
+        // Verify initial connection state
+        Set<CompletableFuture<ClientCnx>> connections = 
pulsarClientImpl.getCnxPool().getConnections();
 
-        // Force all connections from proxy to broker to close and therefore 
require the proxy to re-authenticate with
-        // the broker. (The client doesn't lose this connection.)
-        restartBroker();
-
-        // Rerun assertion to ensure that it still works
-        Awaitility.await().during(5, SECONDS).untilAsserted(() -> {
-            pulsarClient.getPartitionsForTopic(topic).get();
-            assertTrue(connections.stream().allMatch(n -> {
-                try {
-                    ClientCnx clientCnx = n.get();
-                    long timestamp = clientCnx.getLastDisconnectedTimestamp();
-                    return timestamp == 0;
-                } catch (Exception e) {
-                    throw new RuntimeException(e);
-                }
-            }));
-        });
+        Awaitility.await()
+                .during(5, SECONDS)
+                .untilAsserted(() -> {
+                    for (CompletableFuture<ClientCnx> cf : connections) {
+                        try {
+                            ClientCnx clientCnx = cf.get();
+                            long timestamp = 
clientCnx.getLastDisconnectedTimestamp();
+                            // If forwardAuthData is false, the broker cannot 
see the client's authentication data.
+                            // As a result, the broker cannot perform any 
refresh operations on the client's auth data.
+                            // Only the proxy has visibility of the client's 
connection state.
+                            assertTrue(forwardAuthData ? timestamp == 0 : 
timestamp > 0);
+                        } catch (Exception e) {
+                            throw new AssertionError("Failed to get connection 
state", e);
+                        }
+                    }
+                });
     }
 }

Reply via email to