This is an automated email from the ASF dual-hosted git repository. penghui pushed a commit to branch branch-2.9 in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit 960b80f59e8b8700e6a5d30aca7ceb784d267ced Author: Jiwei Guo <[email protected]> AuthorDate: Mon May 9 09:11:19 2022 +0800 [fix][broker] Fix MultiRolesTokenAuthorizationProvider `authorize` issue. (#15454) (cherry picked from commit 19f61d53b88bb195fabb367be722694902c79d22) --- .../MultiRolesTokenAuthorizationProvider.java | 37 ++++------------- .../org/apache/pulsar/common/util/FutureUtil.java | 47 +++++++++++++++++++++- .../apache/pulsar/common/util/FutureUtilTest.java | 45 +++++++++++++++++++++ 3 files changed, 98 insertions(+), 31 deletions(-) diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/MultiRolesTokenAuthorizationProvider.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/MultiRolesTokenAuthorizationProvider.java index c508ccbd5b4..b8f46a52483 100644 --- a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/MultiRolesTokenAuthorizationProvider.java +++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/MultiRolesTokenAuthorizationProvider.java @@ -23,10 +23,15 @@ import io.jsonwebtoken.Jwt; import io.jsonwebtoken.JwtParser; import io.jsonwebtoken.Jwts; import io.jsonwebtoken.RequiredTypeException; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.concurrent.CompletableFuture; +import java.util.function.Function; import org.apache.commons.lang3.StringUtils; import org.apache.pulsar.broker.ServiceConfiguration; import org.apache.pulsar.broker.authentication.AuthenticationDataSource; -import org.apache.pulsar.broker.cache.ConfigurationCacheService; import org.apache.pulsar.broker.resources.PulsarResources; import org.apache.pulsar.common.naming.NamespaceName; import org.apache.pulsar.common.naming.TopicName; @@ -39,14 +44,6 @@ import org.apache.pulsar.common.util.FutureUtil; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.io.IOException; -import java.util.ArrayList; -import java.util.Collections; -import java.util.List; -import java.util.concurrent.CompletableFuture; -import java.util.concurrent.ExecutionException; -import java.util.function.Function; - public class MultiRolesTokenAuthorizationProvider extends PulsarAuthorizationProvider { private static final Logger log = LoggerFactory.getLogger(MultiRolesTokenAuthorizationProvider.class); @@ -137,27 +134,7 @@ public class MultiRolesTokenAuthorizationProvider extends PulsarAuthorizationPro } List<CompletableFuture<Boolean>> futures = new ArrayList<>(roles.size()); roles.forEach(r -> futures.add(authorizeFunc.apply(r))); - return CompletableFuture.supplyAsync(() -> { - do { - try { - List<CompletableFuture<Boolean>> doneFutures = new ArrayList<>(); - FutureUtil.waitForAny(futures).get(); - for (CompletableFuture<Boolean> future : futures) { - if (!future.isDone()) continue; - doneFutures.add(future); - if (future.get()) { - futures.forEach(f -> { - if (!f.isDone()) f.cancel(false); - }); - return true; - } - } - futures.removeAll(doneFutures); - } catch (InterruptedException | ExecutionException ignored) { - } - } while (!futures.isEmpty()); - return false; - }); + return FutureUtil.waitForAny(futures, ret -> (boolean) ret).thenApply(v -> v.isPresent()); } /** diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/util/FutureUtil.java b/pulsar-common/src/main/java/org/apache/pulsar/common/util/FutureUtil.java index 2cdd9fce995..dac204db98e 100644 --- a/pulsar-common/src/main/java/org/apache/pulsar/common/util/FutureUtil.java +++ b/pulsar-common/src/main/java/org/apache/pulsar/common/util/FutureUtil.java @@ -19,6 +19,7 @@ package org.apache.pulsar.common.util; import java.time.Duration; +import java.util.Collection; import java.util.List; import java.util.Optional; import java.util.concurrent.CompletableFuture; @@ -28,7 +29,9 @@ import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledFuture; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; +import java.util.function.Predicate; import java.util.function.Supplier; +import java.util.stream.Collectors; /** * This class is aimed at simplifying work with {@code CompletableFuture}. @@ -51,10 +54,52 @@ public class FutureUtil { * @param futures futures to wait any * @return a new CompletableFuture that is completed when any of the given CompletableFutures complete */ - public static CompletableFuture<Object> waitForAny(List<? extends CompletableFuture<?>> futures) { + public static CompletableFuture<Object> waitForAny(Collection<? extends CompletableFuture<?>> futures) { return CompletableFuture.anyOf(futures.toArray(new CompletableFuture[0])); } + /** + * Return a future that represents the completion of any future that match the predicate in the provided Collection. + * + * @param futures futures to wait any + * @param tester if any future match the predicate + * @return a new CompletableFuture that is completed when any of the given CompletableFutures match the tester + */ + public static CompletableFuture<Optional<Object>> waitForAny(Collection<? extends CompletableFuture<?>> futures, + Predicate<Object> tester) { + return waitForAny(futures).thenCompose(v -> { + if (tester.test(v)) { + futures.forEach(f -> { + if (!f.isDone()) { + f.cancel(true); + } + }); + return CompletableFuture.completedFuture(Optional.of(v)); + } + Collection<CompletableFuture<?>> doneFutures = futures.stream() + .filter(f -> f.isDone()) + .collect(Collectors.toList()); + futures.removeAll(doneFutures); + Optional<?> value = doneFutures.stream() + .filter(f -> !f.isCompletedExceptionally()) + .map(CompletableFuture::join) + .filter(tester) + .findFirst(); + if (!value.isPresent()) { + if (futures.size() == 0) { + return CompletableFuture.completedFuture(Optional.empty()); + } + return waitForAny(futures, tester); + } + futures.forEach(f -> { + if (!f.isDone()) { + f.cancel(true); + } + }); + return CompletableFuture.completedFuture(Optional.of(value.get())); + }); + } + /** * Return a future that represents the completion of the futures in the provided list. diff --git a/pulsar-common/src/test/java/org/apache/pulsar/common/util/FutureUtilTest.java b/pulsar-common/src/test/java/org/apache/pulsar/common/util/FutureUtilTest.java index b9458bf8e1e..0de40767656 100644 --- a/pulsar-common/src/test/java/org/apache/pulsar/common/util/FutureUtilTest.java +++ b/pulsar-common/src/test/java/org/apache/pulsar/common/util/FutureUtilTest.java @@ -25,13 +25,18 @@ import static org.testng.Assert.fail; import java.io.PrintWriter; import java.io.StringWriter; import java.time.Duration; +import java.util.Optional; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CompletionException; import java.util.concurrent.ExecutionException; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeoutException; import lombok.Cleanup; +import org.assertj.core.util.Lists; import org.testng.annotations.Test; +import static org.testng.Assert.assertFalse; +import static org.testng.Assert.assertTrue; public class FutureUtilTest { @@ -91,4 +96,44 @@ public class FutureUtilTest { assertEquals(executionException.getCause(), e); } } + + @Test + public void testWaitForAny() { + CompletableFuture<String> f1 = new CompletableFuture<>(); + CompletableFuture<String> f2 = new CompletableFuture<>(); + CompletableFuture<String> f3 = new CompletableFuture<>(); + CompletableFuture<String> f4 = new CompletableFuture<>(); + f1.complete("1"); + f2.complete("2"); + f3.complete("3"); + f4.complete("4"); + CompletableFuture<Optional<Object>> ret = FutureUtil.waitForAny(Lists.newArrayList(f1, f2, f3, f4), p -> p.equals("3")); + assertEquals(ret.join().get(), "3"); + // test not matched predicate result + CompletableFuture<String> f5 = new CompletableFuture<>(); + CompletableFuture<String> f6 = new CompletableFuture<>(); + f5.complete("5"); + f6.complete("6"); + ret = FutureUtil.waitForAny(Lists.newArrayList(f5, f6), p -> p.equals("3")); + assertFalse(ret.join().isPresent()); + // test one complete, others are cancelled. + CompletableFuture<String> f55 = new CompletableFuture<>(); + CompletableFuture<String> f66 = new CompletableFuture<>(); + f55.complete("55"); + ret = FutureUtil.waitForAny(Lists.newArrayList(f55, f66), p -> p.equals("55")); + assertTrue(ret.join().isPresent()); + assertTrue(f66.isCancelled()); + // test with exception + CompletableFuture<String> f7 = new CompletableFuture<>(); + CompletableFuture<String> f8 = new CompletableFuture<>(); + f8.completeExceptionally(new RuntimeException("f7 exception")); + f8.completeExceptionally(new RuntimeException("f8 exception")); + ret = FutureUtil.waitForAny(Lists.newArrayList(f7, f8), p -> p.equals("3")); + try { + ret.join(); + fail("Should have failed"); + } catch (CompletionException ex) { + assertTrue(ex.getCause() instanceof RuntimeException); + } + } } \ No newline at end of file
