This is an automated email from the ASF dual-hosted git repository.
zixuan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 334847073e2 [fix][proxy] Close client connection immediately when
credentials expire and forwardAuthorizationCredentials is disabled (#25179)
334847073e2 is described below
commit 334847073e2c926eac5c486a5cfda1d1dd42634a
Author: Zixuan Liu <[email protected]>
AuthorDate: Sat Jan 24 11:20:40 2026 +0800
[fix][proxy] Close client connection immediately when credentials expire
and forwardAuthorizationCredentials is disabled (#25179)
---
.../pulsar/proxy/server/ProxyConnection.java | 55 +++++--
.../proxy/server/ProxyAuthenticationTest.java | 175 ++++++++++++++++++---
.../pulsar/proxy/server/ProxyRefreshAuthTest.java | 48 +++---
3 files changed, 216 insertions(+), 62 deletions(-)
diff --git
a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConnection.java
b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConnection.java
index e479b8ee622..cfe788e5229 100644
---
a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConnection.java
+++
b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConnection.java
@@ -149,7 +149,12 @@ public class ProxyConnection extends PulsarHandler {
Closing,
- Closed,
+ Closed;
+
+ boolean isAuthenticatedState() {
+ return this == ProxyLookupRequests
+ || this == ProxyConnectionToBroker;
+ }
}
ConnectionPool getConnectionPool() {
@@ -412,15 +417,7 @@ public class ProxyConnection extends PulsarHandler {
state = State.ProxyLookupRequests;
lookupProxyHandler = service.newLookupProxyHandler(this);
- if (service.getConfiguration().isAuthenticationEnabled()
- &&
service.getConfiguration().getAuthenticationRefreshCheckSeconds() > 0) {
- authRefreshTask = ctx.executor().scheduleAtFixedRate(
- Runnables.catchingAndLoggingThrowables(
-
this::refreshAuthenticationCredentialsAndCloseIfTooExpired),
-
service.getConfiguration().getAuthenticationRefreshCheckSeconds(),
-
service.getConfiguration().getAuthenticationRefreshCheckSeconds(),
- TimeUnit.SECONDS);
- }
+ startAuthRefreshTaskIfNotStarted();
final ByteBuf msg =
Commands.newConnected(protocolVersionToAdvertise, false);
writeAndFlush(msg);
}
@@ -436,6 +433,10 @@ public class ProxyConnection extends PulsarHandler {
final ByteBuf msg =
Commands.newConnected(connected.getProtocolVersion(), maxMessageSize,
connected.hasFeatureFlags() &&
connected.getFeatureFlags().isSupportsTopicWatchers());
writeAndFlush(msg);
+ // Start auth refresh task only if we are not forwarding
authorization credentials
+ if
(!service.getConfiguration().isForwardAuthorizationCredentials()) {
+ startAuthRefreshTaskIfNotStarted();
+ }
} else {
LOG.warn("[{}] Channel is {}. ProxyConnection is in {}. "
+ "Closing connection to broker '{}'.",
@@ -517,16 +518,44 @@ public class ProxyConnection extends PulsarHandler {
}
}
+ private void startAuthRefreshTaskIfNotStarted() {
+ if (service.getConfiguration().isAuthenticationEnabled()
+ &&
service.getConfiguration().getAuthenticationRefreshCheckSeconds() > 0
+ && authRefreshTask == null) {
+ authRefreshTask = ctx.executor().scheduleAtFixedRate(
+ Runnables.catchingAndLoggingThrowables(
+
this::refreshAuthenticationCredentialsAndCloseIfTooExpired),
+
service.getConfiguration().getAuthenticationRefreshCheckSeconds(),
+
service.getConfiguration().getAuthenticationRefreshCheckSeconds(),
+ TimeUnit.SECONDS);
+ }
+ }
+
private void refreshAuthenticationCredentialsAndCloseIfTooExpired() {
assert ctx.executor().inEventLoop();
- if (state != State.ProxyLookupRequests) {
- // Happens when an exception is thrown that causes this connection
to close.
+
+ // Only check expiration in authenticated states
+ if (!state.isAuthenticatedState()) {
return;
- } else if (!authState.isExpired()) {
+ }
+
+ if (!authState.isExpired()) {
// Credentials are still valid. Nothing to do at this point
return;
}
+ // If we are not forwarding authorization credentials to the broker,
the broker cannot
+ // refresh the client's credentials. In this case, we must close the
connection immediately
+ // when credentials expire.
+ if (!service.getConfiguration().isForwardAuthorizationCredentials()) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("[{}] Closing connection because client credentials
have expired and "
+ + "forwardAuthorizationCredentials is disabled (broker
cannot refresh)", remoteAddress);
+ }
+ ctx.close();
+ return;
+ }
+
if (System.nanoTime() - authChallengeSentTime
>
TimeUnit.SECONDS.toNanos(service.getConfiguration().getAuthenticationRefreshCheckSeconds()))
{
LOG.warn("[{}] Closing connection after timeout on refreshing auth
credentials", remoteAddress);
diff --git
a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyAuthenticationTest.java
b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyAuthenticationTest.java
index 6887e9ea234..04529629de7 100644
---
a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyAuthenticationTest.java
+++
b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyAuthenticationTest.java
@@ -18,11 +18,15 @@
*/
package org.apache.pulsar.proxy.server;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
import static org.mockito.Mockito.spy;
import com.google.common.collect.Sets;
import com.google.gson.JsonObject;
import com.google.gson.JsonParser;
import java.io.IOException;
+import java.net.SocketAddress;
+import java.nio.charset.StandardCharsets;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
@@ -30,12 +34,16 @@ import java.util.Map.Entry;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
import javax.naming.AuthenticationException;
+import javax.net.ssl.SSLSession;
import lombok.Cleanup;
import org.apache.pulsar.broker.ServiceConfiguration;
+import org.apache.pulsar.broker.authentication.AuthenticationDataCommand;
import org.apache.pulsar.broker.authentication.AuthenticationDataSource;
import org.apache.pulsar.broker.authentication.AuthenticationProvider;
import org.apache.pulsar.broker.authentication.AuthenticationService;
+import org.apache.pulsar.broker.authentication.AuthenticationState;
import org.apache.pulsar.client.admin.PulsarAdmin;
import org.apache.pulsar.client.api.Authentication;
import org.apache.pulsar.client.api.AuthenticationDataProvider;
@@ -44,8 +52,11 @@ import org.apache.pulsar.client.api.ProducerConsumerBase;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.Schema;
+import org.apache.pulsar.client.impl.ProducerImpl;
+import org.apache.pulsar.common.api.AuthData;
import org.apache.pulsar.common.configuration.PulsarConfigurationLoader;
import org.apache.pulsar.common.policies.data.AuthAction;
+import org.awaitility.Awaitility;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testng.Assert;
@@ -80,6 +91,7 @@ public class ProxyAuthenticationTest extends
ProducerConsumerBase {
public Set<Entry<String, String>> getHttpHeaders() {
Map<String, String> headers = new HashMap<>();
headers.put("BasicAuthentication", authParam);
+ headers.put("X-Pulsar-Auth-Method-Name", "BasicAuthentication");
return headers.entrySet();
}
}
@@ -119,6 +131,72 @@ public class ProxyAuthenticationTest extends
ProducerConsumerBase {
}
}
+ public static class BasicAuthenticationState implements
AuthenticationState {
+ private final long expiryTimeInMillis;
+ private final String authRole;
+ private final AuthenticationDataSource authenticationDataSource;
+
+ private static boolean isExpired(long expiryTimeInMillis) {
+ return System.currentTimeMillis() > expiryTimeInMillis;
+ }
+
+ private static String[] parseAuthData(String commandData) {
+ JsonObject element =
JsonParser.parseString(commandData).getAsJsonObject();
+ long expiryTimeInMillis =
Long.parseLong(element.get("expiryTime").getAsString());
+ if (isExpired(expiryTimeInMillis)) {
+ throw new IllegalArgumentException("Credentials have expired");
+ }
+ String role = element.get("entityType").getAsString();
+ return new String[]{role, String.valueOf(expiryTimeInMillis)};
+ }
+
+ public BasicAuthenticationState(AuthenticationDataSource authData) {
+ this(authData.hasDataFromCommand() ? authData.getCommandData()
+ : authData.getHttpHeader("BasicAuthentication"));
+ }
+
+ public BasicAuthenticationState(AuthData authData) {
+ this(new String(authData.getBytes(), StandardCharsets.UTF_8));
+ }
+
+ private BasicAuthenticationState(String commandData) {
+ String[] parsed = parseAuthData(commandData);
+ this.authRole = parsed[0];
+ this.expiryTimeInMillis = Long.parseLong(parsed[1]);
+ this.authenticationDataSource = new
AuthenticationDataCommand(commandData, null, null);
+ }
+
+ @Override
+ public String getAuthRole() {
+ return authRole;
+ }
+
+ @Override
+ public AuthData authenticate(AuthData authData) throws
AuthenticationException {
+ return null; // Authentication complete
+ }
+
+ @Override
+ public CompletableFuture<AuthData> authenticateAsync(AuthData
authData) {
+ return CompletableFuture.completedFuture(null); // Authentication
complete
+ }
+
+ @Override
+ public AuthenticationDataSource getAuthDataSource() {
+ return authenticationDataSource;
+ }
+
+ @Override
+ public boolean isComplete() {
+ return authRole != null;
+ }
+
+ @Override
+ public boolean isExpired() {
+ return isExpired(expiryTimeInMillis);
+ }
+ }
+
public static class BasicAuthenticationProvider implements
AuthenticationProvider {
@Override
@@ -135,26 +213,14 @@ public class ProxyAuthenticationTest extends
ProducerConsumerBase {
}
@Override
- public CompletableFuture<String>
authenticateAsync(AuthenticationDataSource authData) {
- String commandData = null;
- if (authData.hasDataFromCommand()) {
- commandData = authData.getCommandData();
- } else if (authData.hasDataFromHttp()) {
- commandData = authData.getHttpHeader("BasicAuthentication");
- }
+ public AuthenticationState newAuthState(AuthData authData,
SocketAddress remoteAddress, SSLSession sslSession) {
+ return new BasicAuthenticationState(authData);
+ }
- JsonObject element =
JsonParser.parseString(commandData).getAsJsonObject();
- log.info("Have log of {}", element);
- long expiryTimeInMillis =
Long.parseLong(element.get("expiryTime").getAsString());
- long currentTimeInMillis = System.currentTimeMillis();
- if (expiryTimeInMillis < currentTimeInMillis) {
- log.warn("Auth failed due to timeout");
- return CompletableFuture
- .failedFuture(new
AuthenticationException("Authentication data has been expired"));
- }
- final String result = element.get("entityType").getAsString();
- // Run in another thread to attempt to test the async logic
- return CompletableFuture.supplyAsync(() -> result);
+ @Override
+ public CompletableFuture<String>
authenticateAsync(AuthenticationDataSource authData) {
+ BasicAuthenticationState basicAuthenticationState = new
BasicAuthenticationState(authData);
+ return
CompletableFuture.supplyAsync(basicAuthenticationState::getAuthRole);
}
}
@@ -271,4 +337,75 @@ public class ProxyAuthenticationTest extends
ProducerConsumerBase {
.authentication(BasicAuthentication.class.getName(),
authParams)
.connectionsPerBroker(numberOfConnections).build();
}
+
+ @Test
+ void testClientDisconnectWhenCredentialsExpireWithoutForwardAuth() throws
Exception {
+ log.info("-- Starting {} test --", methodName);
+
+ String namespaceName = "my-property/my-ns";
+ String topicName = "persistent://my-property/my-ns/my-topic1";
+
+ admin.namespaces().grantPermissionOnNamespace(namespaceName, "proxy",
+ Sets.newHashSet(AuthAction.consume, AuthAction.produce));
+ admin.namespaces().grantPermissionOnNamespace(namespaceName, "client",
+ Sets.newHashSet(AuthAction.consume, AuthAction.produce));
+
+ // Important: When forwardAuthorizationCredentials=false, broker
should not authenticate original auth data
+ // because the proxy doesn't forward it. Set
authenticateOriginalAuthData=false to match this behavior.
+ conf.setAuthenticateOriginalAuthData(false);
+
+ ProxyConfiguration proxyConfig = new ProxyConfiguration();
+ proxyConfig.setAuthenticationEnabled(true);
+ proxyConfig.setAuthenticationRefreshCheckSeconds(2); // Check every 2
seconds
+ proxyConfig.setServicePort(Optional.of(0));
+ proxyConfig.setBrokerProxyAllowedTargetPorts("*");
+ proxyConfig.setWebServicePort(Optional.of(0));
+ proxyConfig.setBrokerServiceURL(pulsar.getBrokerServiceUrl());
+ proxyConfig.setClusterName(CLUSTER_NAME);
+
+ // Proxy auth with long expiry
+ String proxyAuthParams = "entityType:proxy,expiryTime:" +
(System.currentTimeMillis() + 3600 * 1000);
+
proxyConfig.setBrokerClientAuthenticationPlugin(BasicAuthentication.class.getName());
+ proxyConfig.setBrokerClientAuthenticationParameters(proxyAuthParams);
+
+ Set<String> providers = new HashSet<>();
+ providers.add(BasicAuthenticationProvider.class.getName());
+ proxyConfig.setAuthenticationProviders(providers);
+ proxyConfig.setForwardAuthorizationCredentials(false);
+
+ @Cleanup
+ AuthenticationService authenticationService = new
AuthenticationService(
+ PulsarConfigurationLoader.convertFrom(proxyConfig));
+ @Cleanup
+ final Authentication proxyClientAuthentication =
+
AuthenticationFactory.create(proxyConfig.getBrokerClientAuthenticationPlugin(),
+ proxyConfig.getBrokerClientAuthenticationParameters());
+ proxyClientAuthentication.start();
+ @Cleanup
+ ProxyService proxyService = new ProxyService(proxyConfig,
authenticationService, proxyClientAuthentication);
+ proxyService.start();
+ final String proxyServiceUrl = proxyService.getServiceUrl();
+
+ // Create client with credentials that will expire in 3 seconds
+ long clientExpireTime = System.currentTimeMillis() + 3 * 1000;
+ String clientAuthParams = "entityType:client,expiryTime:" +
clientExpireTime;
+
+ @Cleanup
+ PulsarClient proxyClient = createPulsarClient(proxyServiceUrl,
clientAuthParams, 1);
+
+ @Cleanup
+ var producer =
+
proxyClient.newProducer(Schema.BYTES).topic(topicName).sendTimeout(5,
TimeUnit.SECONDS).create();
+ producer.send("test message".getBytes());
+
+ Awaitility.await().untilAsserted(() -> {
+ assertThatThrownBy(() -> producer.send("test message after
expiry".getBytes()))
+
.isExactlyInstanceOf(PulsarClientException.TimeoutException.class);
+ });
+
+ if (producer instanceof ProducerImpl<byte[]> producerImpl) {
+ long lastDisconnectedTimestamp =
producerImpl.getLastDisconnectedTimestamp();
+
assertThat(lastDisconnectedTimestamp).isGreaterThan(clientExpireTime);
+ }
+ }
}
diff --git
a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyRefreshAuthTest.java
b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyRefreshAuthTest.java
index f3ff1362fd8..7501eb9306f 100644
---
a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyRefreshAuthTest.java
+++
b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyRefreshAuthTest.java
@@ -179,37 +179,25 @@ public class ProxyRefreshAuthTest extends
ProducerConsumerBase {
PulsarClientImpl pulsarClientImpl = (PulsarClientImpl) pulsarClient;
pulsarClient.getPartitionsForTopic(topic).get();
- Set<CompletableFuture<ClientCnx>> connections =
pulsarClientImpl.getCnxPool().getConnections();
- Awaitility.await().during(5, SECONDS).untilAsserted(() -> {
- pulsarClient.getPartitionsForTopic(topic).get();
- assertTrue(connections.stream().allMatch(n -> {
- try {
- ClientCnx clientCnx = n.get();
- long timestamp = clientCnx.getLastDisconnectedTimestamp();
- return timestamp == 0;
- } catch (Exception e) {
- throw new RuntimeException(e);
- }
- }));
- });
+ // Verify initial connection state
+ Set<CompletableFuture<ClientCnx>> connections =
pulsarClientImpl.getCnxPool().getConnections();
- // Force all connections from proxy to broker to close and therefore
require the proxy to re-authenticate with
- // the broker. (The client doesn't lose this connection.)
- restartBroker();
-
- // Rerun assertion to ensure that it still works
- Awaitility.await().during(5, SECONDS).untilAsserted(() -> {
- pulsarClient.getPartitionsForTopic(topic).get();
- assertTrue(connections.stream().allMatch(n -> {
- try {
- ClientCnx clientCnx = n.get();
- long timestamp = clientCnx.getLastDisconnectedTimestamp();
- return timestamp == 0;
- } catch (Exception e) {
- throw new RuntimeException(e);
- }
- }));
- });
+ Awaitility.await()
+ .during(5, SECONDS)
+ .untilAsserted(() -> {
+ for (CompletableFuture<ClientCnx> cf : connections) {
+ try {
+ ClientCnx clientCnx = cf.get();
+ long timestamp =
clientCnx.getLastDisconnectedTimestamp();
+ // If forwardAuthData is false, the broker cannot
see the client's authentication data.
+ // As a result, the broker cannot perform any
refresh operations on the client's auth data.
+ // Only the proxy has visibility of the client's
connection state.
+ assertTrue(forwardAuthData ? timestamp == 0 :
timestamp > 0);
+ } catch (Exception e) {
+ throw new AssertionError("Failed to get connection
state", e);
+ }
+ }
+ });
}
}