This is an automated email from the ASF dual-hosted git repository. lhotari pushed a commit to branch branch-4.0 in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit dd3e802b99cc729339eb79c2ae63ef36648e8ad6 Author: Shen Liu <[email protected]> AuthorDate: Fri Jan 2 21:33:46 2026 +0800 [fix][broker] Fix MultiRolesTokenAuthorizationProvider error when subscription prefix doesn't match. (#25121) Co-authored-by: druidliu <[email protected]> (cherry picked from commit 133fe203c54adf19cc3b5b2f142ea8c63e018eb9) --- .../MultiRolesTokenAuthorizationProvider.java | 6 +- .../MultiRolesTokenAuthorizationProviderTest.java | 107 +++++++++++++++++++++ 2 files changed, 112 insertions(+), 1 deletion(-) diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/MultiRolesTokenAuthorizationProvider.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/MultiRolesTokenAuthorizationProvider.java index fdab233a510..6b46289af48 100644 --- a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/MultiRolesTokenAuthorizationProvider.java +++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/MultiRolesTokenAuthorizationProvider.java @@ -217,7 +217,11 @@ public class MultiRolesTokenAuthorizationProvider extends PulsarAuthorizationPro return CompletableFuture.completedFuture(false); } List<CompletableFuture<Boolean>> futures = new ArrayList<>(roles.size()); - roles.forEach(r -> futures.add(authorizeFunc.apply(r))); + if (roles.size() == 1) { + roles.forEach(r -> futures.add(authorizeFunc.apply(r))); + } else { + roles.forEach(r -> futures.add(authorizeFunc.apply(r).exceptionally(ex -> false))); + } return FutureUtil.waitForAny(futures, ret -> (boolean) ret).thenApply(v -> v.isPresent()); }); } diff --git a/pulsar-broker-common/src/test/java/org/apache/pulsar/broker/authorization/MultiRolesTokenAuthorizationProviderTest.java b/pulsar-broker-common/src/test/java/org/apache/pulsar/broker/authorization/MultiRolesTokenAuthorizationProviderTest.java index e6818dca4c0..094152a5c44 100644 --- a/pulsar-broker-common/src/test/java/org/apache/pulsar/broker/authorization/MultiRolesTokenAuthorizationProviderTest.java +++ b/pulsar-broker-common/src/test/java/org/apache/pulsar/broker/authorization/MultiRolesTokenAuthorizationProviderTest.java @@ -21,14 +21,17 @@ package org.apache.pulsar.broker.authorization; import static org.mockito.Mockito.mock; import static org.testng.Assert.assertFalse; import static org.testng.Assert.assertTrue; +import static org.testng.Assert.expectThrows; import io.jsonwebtoken.Jwts; import io.jsonwebtoken.SignatureAlgorithm; import java.util.Properties; import java.util.Set; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; import java.util.function.Function; import javax.crypto.SecretKey; import lombok.Cleanup; +import org.apache.pulsar.broker.PulsarServerException; import org.apache.pulsar.broker.ServiceConfiguration; import org.apache.pulsar.broker.authentication.AuthenticationDataSource; import org.apache.pulsar.broker.authentication.AuthenticationDataSubscription; @@ -304,4 +307,108 @@ public class MultiRolesTokenAuthorizationProviderTest { assertTrue(provider.authorize("admin1", null, authorizeFunc).get()); assertFalse(provider.authorize("admin2", null, authorizeFunc).get()); } + + /** + * Test subscription prefix mismatch exception handling. + * <p> + * Scenario 1: One role authorization succeeds, another role throws subscription prefix mismatch exception + * -> Returns true (exception is swallowed) + * Scenario 2: All roles throw subscription prefix mismatch exception -> Returns false + */ + @Test + public void testMultiRolesAuthzWithSubscriptionPrefixMismatchException() throws Exception { + SecretKey secretKey = AuthTokenUtils.createSecretKey(SignatureAlgorithm.HS256); + String userA = "user-a"; + String userB = "user-b"; + String token = Jwts.builder() + .claim("sub", new String[]{userA, userB}) + .signWith(secretKey).compact(); + + MultiRolesTokenAuthorizationProvider provider = new MultiRolesTokenAuthorizationProvider(); + ServiceConfiguration conf = new ServiceConfiguration(); + provider.initialize(conf, mock(PulsarResources.class)); + + AuthenticationDataSource ads = new AuthenticationDataSource() { + @Override + public boolean hasDataFromHttp() { + return true; + } + + @Override + public String getHttpHeader(String name) { + if (name.equals("Authorization")) { + return "Bearer " + token; + } else { + throw new IllegalArgumentException("Wrong HTTP header"); + } + } + }; + + // userA throws subscription prefix mismatch exception, userB returns true -> result should be true + assertTrue(provider.authorize("test", ads, role -> { + if (role.equals(userA)) { + CompletableFuture<Boolean> future = new CompletableFuture<>(); + future.completeExceptionally(new PulsarServerException( + "The subscription name needs to be prefixed by the authentication role")); + return future; + } + return CompletableFuture.completedFuture(true); + }).get()); + + // All roles throw subscription prefix mismatch exception -> result should be false + assertFalse(provider.authorize("test", ads, role -> { + CompletableFuture<Boolean> future = new CompletableFuture<>(); + future.completeExceptionally(new PulsarServerException( + "The subscription name needs to be prefixed by the authentication role")); + return future; + }).get()); + } + + /** + * Test single role with subscription prefix mismatch exception. + * <p> + * Single role throws subscription prefix mismatch exception -> Should throw the original exception + * (Single role keeps original behavior, does not swallow exception) + */ + @Test + public void testSingleRoleAuthzWithSubscriptionPrefixMismatchException() throws Exception { + SecretKey secretKey = AuthTokenUtils.createSecretKey(SignatureAlgorithm.HS256); + String userA = "user-a"; + String token = Jwts.builder() + .claim("sub", userA) + .signWith(secretKey).compact(); + + MultiRolesTokenAuthorizationProvider provider = new MultiRolesTokenAuthorizationProvider(); + ServiceConfiguration conf = new ServiceConfiguration(); + provider.initialize(conf, mock(PulsarResources.class)); + + AuthenticationDataSource ads = new AuthenticationDataSource() { + @Override + public boolean hasDataFromHttp() { + return true; + } + + @Override + public String getHttpHeader(String name) { + if (name.equals("Authorization")) { + return "Bearer " + token; + } else { + throw new IllegalArgumentException("Wrong HTTP header"); + } + } + }; + + // Single role throws subscription prefix mismatch exception -> should propagate exception + ExecutionException ex = expectThrows(ExecutionException.class, () -> { + provider.authorize("test", ads, role -> { + CompletableFuture<Boolean> future = new CompletableFuture<>(); + future.completeExceptionally(new PulsarServerException( + "The subscription name needs to be prefixed by the authentication role")); + return future; + }).get(); + }); + assertTrue(ex.getCause() instanceof PulsarServerException); + assertTrue(ex.getCause().getMessage().contains( + "The subscription name needs to be prefixed by the authentication role")); + } }
