This is an automated email from the ASF dual-hosted git repository. penghui pushed a commit to branch branch-2.9 in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit 2ca5a0469a466510ff801a8e737430ee47edaf39 Author: ran <[email protected]> AuthorDate: Thu Jan 20 20:50:53 2022 +0800 [Broker] Avoid thread deadlock problem when creating topic policy reader (#13837) Currently, the topic policy reader creation thread is the `executor` of the `PulsarService`, this thread is also used to search the candidate broker. If they use the same thread in some conditions, the lookup request will be blocked and result in a lookup request timeout.   We could find out that the lookup request was blocked 1 minute until the lookup request timeout. The thread `pulsar-2-8` was blocked by topic policy reader creation. Change the topic policy reader creation to be asynchronous. Modify the method `RetryUtil.retryAsynchronously` to handle asynchronous execution. Add a new test to verify consumer creations can be successful when enabling the topic policy feature. (cherry picked from commit 760bfec51ed6f043024c3c42bc73639fc2c117bb) --- .../SystemTopicBasedTopicPoliciesService.java | 12 ++------ .../broker/service/TopicPoliciesService.java | 6 ++-- .../SystemTopicBasedTopicPoliciesServiceTest.java | 9 +++--- .../systopic/PartitionedSystemTopicTest.java | 33 +++++++++++++++++++++ .../apache/pulsar/client/impl/RetryUtilTest.java | 13 +++++---- .../org/apache/pulsar/client/util/RetryUtil.java | 34 ++++++++++------------ 6 files changed, 69 insertions(+), 38 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesService.java index 44b866f..838edfe 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesService.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesService.java @@ -231,7 +231,7 @@ public class SystemTopicBasedTopicPoliciesService implements TopicPoliciesServic private void prepareInitPoliciesCache(NamespaceName namespace, CompletableFuture<Void> result) { if (policyCacheInitMap.putIfAbsent(namespace, false) == null) { CompletableFuture<SystemTopicClient.Reader<PulsarEvent>> readerCompletableFuture = - creatSystemTopicClientWithRetry(namespace); + createSystemTopicClientWithRetry(namespace); readerCaches.put(namespace, readerCompletableFuture); ownedBundlesCountPerNamespace.putIfAbsent(namespace, new AtomicInteger(1)); readerCompletableFuture.whenComplete((reader, ex) -> { @@ -248,7 +248,7 @@ public class SystemTopicBasedTopicPoliciesService implements TopicPoliciesServic } } - protected CompletableFuture<SystemTopicClient.Reader<PulsarEvent>> creatSystemTopicClientWithRetry( + protected CompletableFuture<SystemTopicClient.Reader<PulsarEvent>> createSystemTopicClientWithRetry( NamespaceName namespace) { CompletableFuture<SystemTopicClient.Reader<PulsarEvent>> result = new CompletableFuture<>(); try { @@ -260,13 +260,7 @@ public class SystemTopicBasedTopicPoliciesService implements TopicPoliciesServic SystemTopicClient<PulsarEvent> systemTopicClient = namespaceEventsSystemTopicFactory .createTopicPoliciesSystemTopicClient(namespace); Backoff backoff = new Backoff(1, TimeUnit.SECONDS, 3, TimeUnit.SECONDS, 10, TimeUnit.SECONDS); - RetryUtil.retryAsynchronously(() -> { - try { - return systemTopicClient.newReader(); - } catch (PulsarClientException e) { - throw new RuntimeException(e); - } - }, backoff, pulsarService.getExecutor(), result); + RetryUtil.retryAsynchronously(systemTopicClient::newReaderAsync, backoff, pulsarService.getExecutor(), result); return result; } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/TopicPoliciesService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/TopicPoliciesService.java index 53de087..0165dc1 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/TopicPoliciesService.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/TopicPoliciesService.java @@ -85,11 +85,13 @@ public interface TopicPoliciesService { .create() : backoff; try { RetryUtil.retryAsynchronously(() -> { + CompletableFuture<Optional<TopicPolicies>> future = new CompletableFuture<>(); try { - return Optional.ofNullable(getTopicPolicies(topicName)); + future.complete(Optional.ofNullable(getTopicPolicies(topicName))); } catch (BrokerServiceException.TopicPoliciesCacheNotInitException exception) { - throw new RuntimeException(exception); + future.completeExceptionally(exception); } + return future; }, usedBackoff, scheduledExecutorService, response); } catch (Exception e) { response.completeExceptionally(e); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesServiceTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesServiceTest.java index 9a489cf..8b5f420 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesServiceTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesServiceTest.java @@ -20,7 +20,6 @@ package org.apache.pulsar.broker.service; import static org.mockito.ArgumentMatchers.any; import static org.mockito.Mockito.doReturn; -import static org.mockito.Mockito.doThrow; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.spy; import static org.mockito.Mockito.when; @@ -56,6 +55,7 @@ import org.apache.pulsar.common.naming.TopicName; import org.apache.pulsar.common.policies.data.ClusterData; import org.apache.pulsar.common.policies.data.TenantInfoImpl; import org.apache.pulsar.common.policies.data.TopicPolicies; +import org.apache.pulsar.common.util.FutureUtil; import org.awaitility.Awaitility; import org.testng.Assert; import org.testng.annotations.AfterMethod; @@ -313,7 +313,7 @@ public class SystemTopicBasedTopicPoliciesServiceTest extends MockedPulsarServic try { service.getTopicPoliciesAsyncWithRetry(TOPIC1, backoff, pulsar.getExecutor()).get(); } catch (Exception e) { - assertTrue(e.getCause().getCause() instanceof TopicPoliciesCacheNotInitException); + assertTrue(e.getCause() instanceof TopicPoliciesCacheNotInitException); } long cost = System.currentTimeMillis() - start; assertTrue("actual:" + cost, cost >= 5000 - 1000); @@ -333,9 +333,10 @@ public class SystemTopicBasedTopicPoliciesServiceTest extends MockedPulsarServic SystemTopicClient.Reader<PulsarEvent> reader = mock(SystemTopicClient.Reader.class); // Throw an exception first, create successfully after retrying - doThrow(new PulsarClientException("test")).doReturn(reader).when(client).newReader(); + doReturn(FutureUtil.failedFuture(new PulsarClientException("test"))) + .doReturn(CompletableFuture.completedFuture(reader)).when(client).newReaderAsync(); - SystemTopicClient.Reader<PulsarEvent> reader1 = service.creatSystemTopicClientWithRetry(null).get(); + SystemTopicClient.Reader<PulsarEvent> reader1 = service.createSystemTopicClientWithRetry(null).get(); assertEquals(reader1, reader); } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/systopic/PartitionedSystemTopicTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/systopic/PartitionedSystemTopicTest.java index bd4ef78..7dd02bd 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/systopic/PartitionedSystemTopicTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/systopic/PartitionedSystemTopicTest.java @@ -18,14 +18,23 @@ */ package org.apache.pulsar.broker.systopic; +import com.google.common.collect.Sets; +import org.apache.commons.lang.RandomStringUtils; import org.apache.pulsar.broker.service.BrokerTestBase; +import org.apache.pulsar.client.api.Consumer; import org.apache.pulsar.common.events.EventsTopicNames; import org.apache.pulsar.common.naming.NamespaceName; +import org.apache.pulsar.common.policies.data.TenantInfoImpl; +import org.apache.pulsar.common.util.FutureUtil; import org.testng.Assert; import org.testng.annotations.AfterMethod; import org.testng.annotations.BeforeMethod; import org.testng.annotations.Test; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.CompletableFuture; + @Test(groups = "broker") public class PartitionedSystemTopicTest extends BrokerTestBase { @@ -66,4 +75,28 @@ public class PartitionedSystemTopicTest extends BrokerTestBase { Assert.assertEquals(partitions, PARTITIONS); Assert.assertEquals(admin.topics().getList(ns).size(), PARTITIONS); } + + @Test(timeOut = 1000 * 60) + public void testConsumerCreationWhenEnablingTopicPolicy() throws Exception { + String tenant = "tenant-" + RandomStringUtils.randomAlphabetic(4).toLowerCase(); + admin.tenants().createTenant(tenant, new TenantInfoImpl(Sets.newHashSet(), Sets.newHashSet("test"))); + int namespaceCount = 30; + for (int i = 0; i < namespaceCount; i++) { + String ns = tenant + "/ns-" + i; + admin.namespaces().createNamespace(ns, 4); + String topic = ns + "/t1"; + admin.topics().createPartitionedTopic(topic, 2); + } + + List<CompletableFuture<Consumer<byte[]>>> futureList = new ArrayList<>(); + for (int i = 0; i < namespaceCount; i++) { + String topic = tenant + "/ns-" + i + "/t1"; + futureList.add(pulsarClient.newConsumer() + .topic(topic) + .subscriptionName("sub") + .subscribeAsync()); + } + FutureUtil.waitForAll(futureList).get(); + } + } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/RetryUtilTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/RetryUtilTest.java index e17f376..f554430 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/RetryUtilTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/RetryUtilTest.java @@ -19,6 +19,7 @@ package org.apache.pulsar.client.impl; import org.apache.pulsar.client.util.RetryUtil; +import org.apache.pulsar.common.util.FutureUtil; import org.testng.annotations.Test; import java.util.concurrent.CompletableFuture; @@ -45,11 +46,14 @@ public class RetryUtilTest { .setMandatoryStop(5000, TimeUnit.MILLISECONDS) .create(); RetryUtil.retryAsynchronously(() -> { + CompletableFuture<Boolean> future = new CompletableFuture<>(); atomicInteger.incrementAndGet(); if (atomicInteger.get() < 5) { - throw new RuntimeException("fail"); + future.completeExceptionally(new RuntimeException("fail")); + } else { + future.complete(true); } - return true; + return future; }, backoff, executor, callback); assertTrue(callback.get()); assertEquals(atomicInteger.get(), 5); @@ -66,9 +70,8 @@ public class RetryUtilTest { .setMandatoryStop(5000, TimeUnit.MILLISECONDS) .create(); long start = System.currentTimeMillis(); - RetryUtil.retryAsynchronously(() -> { - throw new RuntimeException("fail"); - }, backoff, executor, callback); + RetryUtil.retryAsynchronously(() -> + FutureUtil.failedFuture(new RuntimeException("fail")), backoff, executor, callback); try { callback.get(); } catch (Exception e) { diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/util/RetryUtil.java b/pulsar-client/src/main/java/org/apache/pulsar/client/util/RetryUtil.java index 084a583..b3ed2c3 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/util/RetryUtil.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/util/RetryUtil.java @@ -30,7 +30,7 @@ import java.util.function.Supplier; public class RetryUtil { private static final Logger log = LoggerFactory.getLogger(RetryUtil.class); - public static <T> void retryAsynchronously(Supplier<T> supplier, Backoff backoff, + public static <T> void retryAsynchronously(Supplier<CompletableFuture<T>> supplier, Backoff backoff, ScheduledExecutorService scheduledExecutorService, CompletableFuture<T> callback) { if (backoff.getMax() <= 0) { @@ -43,26 +43,24 @@ public class RetryUtil { executeWithRetry(supplier, backoff, scheduledExecutorService, callback)); } - private static <T> void executeWithRetry(Supplier<T> supplier, Backoff backoff, + private static <T> void executeWithRetry(Supplier<CompletableFuture<T>> supplier, Backoff backoff, ScheduledExecutorService scheduledExecutorService, CompletableFuture<T> callback) { - try { - T result = supplier.get(); - callback.complete(result); - } catch (Exception e) { - long next = backoff.next(); - boolean isMandatoryStop = backoff.isMandatoryStopMade(); - if (isMandatoryStop) { - callback.completeExceptionally(e); - } else { - if (log.isDebugEnabled()) { - log.debug("execute with retry fail, will retry in {} ms", next, e); + supplier.get().whenComplete((result, e) -> { + if (e != null) { + long next = backoff.next(); + boolean isMandatoryStop = backoff.isMandatoryStopMade(); + if (isMandatoryStop) { + callback.completeExceptionally(e); + } else { + log.warn("Execution with retry fail, because of {}, will retry in {} ms", e.getMessage(), next); + scheduledExecutorService.schedule(() -> + executeWithRetry(supplier, backoff, scheduledExecutorService, callback), + next, TimeUnit.MILLISECONDS); } - log.info("Because of {} , will retry in {} ms", e.getMessage(), next); - scheduledExecutorService.schedule(() -> - executeWithRetry(supplier, backoff, scheduledExecutorService, callback), - next, TimeUnit.MILLISECONDS); + return; } - } + callback.complete(result); + }); } }
