This is an automated email from the ASF dual-hosted git repository.
merlimat pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 141438756c1 [fix][test] Fix flaky
SaslAuthenticateTest.testMaxInflightContext() test (#25948)
141438756c1 is described below
commit 141438756c1bae463c26c4f2f7ee77eb6353c315
Author: Oneby Wang <[email protected]>
AuthorDate: Sat Jun 6 23:29:09 2026 +0800
[fix][test] Fix flaky SaslAuthenticateTest.testMaxInflightContext() test
(#25948)
---
.../authentication/AuthenticationProviderSasl.java | 11 ++++
.../authentication/SaslAuthenticateTest.java | 70 +++++++++++++++++++---
2 files changed, 73 insertions(+), 8 deletions(-)
diff --git
a/pulsar-broker-auth-sasl/src/main/java/org/apache/pulsar/broker/authentication/AuthenticationProviderSasl.java
b/pulsar-broker-auth-sasl/src/main/java/org/apache/pulsar/broker/authentication/AuthenticationProviderSasl.java
index 6dc2ad02933..5e539275af8 100644
---
a/pulsar-broker-auth-sasl/src/main/java/org/apache/pulsar/broker/authentication/AuthenticationProviderSasl.java
+++
b/pulsar-broker-auth-sasl/src/main/java/org/apache/pulsar/broker/authentication/AuthenticationProviderSasl.java
@@ -35,6 +35,7 @@ import static
org.apache.pulsar.common.sasl.SaslConstants.SASL_STATE_SERVER;
import static
org.apache.pulsar.common.sasl.SaslConstants.SASL_STATE_SERVER_CHECK_TOKEN;
import com.github.benmanes.caffeine.cache.Cache;
import com.github.benmanes.caffeine.cache.Caffeine;
+import com.google.common.annotations.VisibleForTesting;
import jakarta.servlet.http.HttpServletRequest;
import jakarta.servlet.http.HttpServletResponse;
import java.io.IOException;
@@ -334,6 +335,16 @@ public class AuthenticationProviderSasl implements
AuthenticationProvider {
}
}
+ @VisibleForTesting
+ Cache<Long, AuthenticationState> getAuthStates() {
+ return authStates;
+ }
+
+ @VisibleForTesting
+ void setAuthStates(Cache<Long, AuthenticationState> authStates) {
+ this.authStates = authStates;
+ }
+
private String sanitizeHeaderValue(String value) {
if (value == null) {
return null;
diff --git
a/pulsar-broker-auth-sasl/src/test/java/org/apache/pulsar/broker/authentication/SaslAuthenticateTest.java
b/pulsar-broker-auth-sasl/src/test/java/org/apache/pulsar/broker/authentication/SaslAuthenticateTest.java
index bb8595daced..24d895dffca 100644
---
a/pulsar-broker-auth-sasl/src/test/java/org/apache/pulsar/broker/authentication/SaslAuthenticateTest.java
+++
b/pulsar-broker-auth-sasl/src/test/java/org/apache/pulsar/broker/authentication/SaslAuthenticateTest.java
@@ -26,6 +26,7 @@ import static org.testng.Assert.assertNull;
import static org.testng.Assert.assertTrue;
import static org.testng.Assert.fail;
import com.github.benmanes.caffeine.cache.Cache;
+import com.github.benmanes.caffeine.cache.Caffeine;
import com.google.common.collect.ImmutableSet;
import jakarta.servlet.http.HttpServletRequest;
import jakarta.servlet.http.HttpServletResponse;
@@ -42,6 +43,9 @@ import java.util.HashSet;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import javax.security.auth.login.Configuration;
import lombok.Cleanup;
@@ -356,12 +360,11 @@ public class SaslAuthenticateTest extends
ProducerConsumerBase {
}
@Test
- @SuppressWarnings("unchecked")
public void testMaxInflightContext() throws Exception {
@Cleanup
AuthenticationProviderSasl saslServer = new
AuthenticationProviderSasl();
HttpServletRequest servletRequest = mock(HttpServletRequest.class);
- doReturn("Init").when(servletRequest).getHeader("State");
+
doReturn(SaslConstants.SASL_STATE_CLIENT_INIT).when(servletRequest).getHeader(SaslConstants.SASL_HEADER_STATE);
conf.setInflightSaslContextExpiryMs(Integer.MAX_VALUE);
conf.setMaxInflightSaslContext(1);
saslServer.initialize(AuthenticationProvider.Context.builder().config(conf).build());
@@ -370,14 +373,65 @@ public class SaslAuthenticateTest extends
ProducerConsumerBase {
AuthenticationDataProvider dataProvider =
authSasl.getAuthData("localhost");
AuthData initData1 =
dataProvider.authenticate(AuthData.INIT_AUTH_DATA);
doReturn(Base64.getEncoder().encodeToString(initData1.getBytes())).when(
- servletRequest).getHeader("SASL-Token");
-
doReturn(String.valueOf(i)).when(servletRequest).getHeader("SASL-Server-ID");
+ servletRequest).getHeader(SaslConstants.SASL_AUTH_TOKEN);
+
doReturn(String.valueOf(i)).when(servletRequest).getHeader(SaslConstants.SASL_STATE_SERVER);
saslServer.authenticateHttpRequest(servletRequest,
mock(HttpServletResponse.class));
}
- Field field =
AuthenticationProviderSasl.class.getDeclaredField("authStates");
- field.setAccessible(true);
- Cache<Long, AuthenticationState> cache = (Cache<Long,
AuthenticationState>) field.get(saslServer);
+ Cache<Long, AuthenticationState> cache = saslServer.getAuthStates();
//only 1 context was left in the memory
- assertEquals(cache.asMap().size(), 1);
+ // Caffeine may perform size-based eviction asynchronously, so force
maintenance before asserting.
+ cache.cleanUp();
+ assertEquals(cache.asMap().size(), conf.getMaxInflightSaslContext());
+ }
+
+ @Test
+ public void testMaxInflightContextWithDelayedCaffeineMaintenance() throws
Exception {
+ @Cleanup
+ AuthenticationProviderSasl saslServer = new
AuthenticationProviderSasl();
+ HttpServletRequest servletRequest = mock(HttpServletRequest.class);
+
doReturn(SaslConstants.SASL_STATE_CLIENT_INIT).when(servletRequest).getHeader(SaslConstants.SASL_HEADER_STATE);
+ conf.setInflightSaslContextExpiryMs(Integer.MAX_VALUE);
+ conf.setMaxInflightSaslContext(1);
+
saslServer.initialize(AuthenticationProvider.Context.builder().config(conf).build());
+
+ CountDownLatch maintenanceStarted = new CountDownLatch(1);
+ CountDownLatch allowMaintenance = new CountDownLatch(1);
+ @Cleanup("shutdownNow")
+ ExecutorService maintenanceExecutor =
Executors.newSingleThreadExecutor();
+ Cache<Long, AuthenticationState> delayedMaintenanceCache =
Caffeine.newBuilder()
+ .maximumSize(conf.getMaxInflightSaslContext())
+ .expireAfterWrite(conf.getInflightSaslContextExpiryMs(),
TimeUnit.MILLISECONDS)
+ .executor(command -> maintenanceExecutor.execute(() -> {
+ maintenanceStarted.countDown();
+ try {
+ allowMaintenance.await();
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ }
+ command.run();
+ }))
+ .build();
+ saslServer.setAuthStates(delayedMaintenanceCache);
+
+ try {
+ for (int i = 0; i < 10; i++) {
+ AuthenticationDataProvider dataProvider =
authSasl.getAuthData("localhost");
+ AuthData initData1 =
dataProvider.authenticate(AuthData.INIT_AUTH_DATA);
+
doReturn(Base64.getEncoder().encodeToString(initData1.getBytes())).when(
+
servletRequest).getHeader(SaslConstants.SASL_AUTH_TOKEN);
+
doReturn(String.valueOf(i)).when(servletRequest).getHeader(SaslConstants.SASL_STATE_SERVER);
+ saslServer.authenticateHttpRequest(servletRequest,
mock(HttpServletResponse.class));
+ }
+ assertTrue(maintenanceStarted.await(5, TimeUnit.SECONDS));
+
+ Cache<Long, AuthenticationState> cache =
saslServer.getAuthStates();
+ assertTrue(cache.asMap().size() >
conf.getMaxInflightSaslContext());
+
+ // Caffeine may perform size-based eviction asynchronously, so
force maintenance before asserting.
+ cache.cleanUp();
+ assertEquals(cache.asMap().size(),
conf.getMaxInflightSaslContext());
+ } finally {
+ allowMaintenance.countDown();
+ }
}
}