This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch branch-2.9
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 2ca5a0469a466510ff801a8e737430ee47edaf39
Author: ran <[email protected]>
AuthorDate: Thu Jan 20 20:50:53 2022 +0800

    [Broker] Avoid thread deadlock problem when creating topic policy reader 
(#13837)
    
    Currently, the topic policy reader creation thread is the `executor` of the 
`PulsarService`, this thread is also used to search the candidate broker. If 
they use the same thread in some conditions, the lookup request will be blocked 
and result in a lookup request timeout.
    
    
![image](https://user-images.githubusercontent.com/15029908/150124902-c4318182-56f2-4b31-9149-ae64a9919aa4.png)
    
    
![image](https://user-images.githubusercontent.com/15029908/150124952-66a9f095-bab2-40fa-9370-76d3f2158dac.png)
    
    We could find out that the lookup request was blocked 1 minute until the 
lookup request timeout. The thread `pulsar-2-8` was blocked by topic policy 
reader creation.
    
    Change the topic policy reader creation to be asynchronous. Modify the 
method `RetryUtil.retryAsynchronously` to handle asynchronous execution.
    
    Add a new test to verify consumer creations can be successful when enabling 
the topic policy feature.
    
    (cherry picked from commit 760bfec51ed6f043024c3c42bc73639fc2c117bb)
---
 .../SystemTopicBasedTopicPoliciesService.java      | 12 ++------
 .../broker/service/TopicPoliciesService.java       |  6 ++--
 .../SystemTopicBasedTopicPoliciesServiceTest.java  |  9 +++---
 .../systopic/PartitionedSystemTopicTest.java       | 33 +++++++++++++++++++++
 .../apache/pulsar/client/impl/RetryUtilTest.java   | 13 +++++----
 .../org/apache/pulsar/client/util/RetryUtil.java   | 34 ++++++++++------------
 6 files changed, 69 insertions(+), 38 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesService.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesService.java
index 44b866f..838edfe 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesService.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesService.java
@@ -231,7 +231,7 @@ public class SystemTopicBasedTopicPoliciesService 
implements TopicPoliciesServic
     private void prepareInitPoliciesCache(NamespaceName namespace, 
CompletableFuture<Void> result) {
         if (policyCacheInitMap.putIfAbsent(namespace, false) == null) {
             CompletableFuture<SystemTopicClient.Reader<PulsarEvent>> 
readerCompletableFuture =
-                    creatSystemTopicClientWithRetry(namespace);
+                    createSystemTopicClientWithRetry(namespace);
             readerCaches.put(namespace, readerCompletableFuture);
             ownedBundlesCountPerNamespace.putIfAbsent(namespace, new 
AtomicInteger(1));
             readerCompletableFuture.whenComplete((reader, ex) -> {
@@ -248,7 +248,7 @@ public class SystemTopicBasedTopicPoliciesService 
implements TopicPoliciesServic
         }
     }
 
-    protected CompletableFuture<SystemTopicClient.Reader<PulsarEvent>> 
creatSystemTopicClientWithRetry(
+    protected CompletableFuture<SystemTopicClient.Reader<PulsarEvent>> 
createSystemTopicClientWithRetry(
             NamespaceName namespace) {
         CompletableFuture<SystemTopicClient.Reader<PulsarEvent>> result = new 
CompletableFuture<>();
         try {
@@ -260,13 +260,7 @@ public class SystemTopicBasedTopicPoliciesService 
implements TopicPoliciesServic
         SystemTopicClient<PulsarEvent> systemTopicClient = 
namespaceEventsSystemTopicFactory
                 .createTopicPoliciesSystemTopicClient(namespace);
         Backoff backoff = new Backoff(1, TimeUnit.SECONDS, 3, 
TimeUnit.SECONDS, 10, TimeUnit.SECONDS);
-        RetryUtil.retryAsynchronously(() -> {
-            try {
-                return systemTopicClient.newReader();
-            } catch (PulsarClientException e) {
-                throw new RuntimeException(e);
-            }
-        }, backoff, pulsarService.getExecutor(), result);
+        RetryUtil.retryAsynchronously(systemTopicClient::newReaderAsync, 
backoff, pulsarService.getExecutor(), result);
         return result;
     }
 
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/TopicPoliciesService.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/TopicPoliciesService.java
index 53de087..0165dc1 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/TopicPoliciesService.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/TopicPoliciesService.java
@@ -85,11 +85,13 @@ public interface TopicPoliciesService {
                 .create() : backoff;
         try {
             RetryUtil.retryAsynchronously(() -> {
+                CompletableFuture<Optional<TopicPolicies>> future = new 
CompletableFuture<>();
                 try {
-                    return Optional.ofNullable(getTopicPolicies(topicName));
+                    
future.complete(Optional.ofNullable(getTopicPolicies(topicName)));
                 } catch 
(BrokerServiceException.TopicPoliciesCacheNotInitException exception) {
-                    throw new RuntimeException(exception);
+                    future.completeExceptionally(exception);
                 }
+                return future;
             }, usedBackoff, scheduledExecutorService, response);
         } catch (Exception e) {
             response.completeExceptionally(e);
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesServiceTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesServiceTest.java
index 9a489cf..8b5f420 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesServiceTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesServiceTest.java
@@ -20,7 +20,6 @@ package org.apache.pulsar.broker.service;
 
 import static org.mockito.ArgumentMatchers.any;
 import static org.mockito.Mockito.doReturn;
-import static org.mockito.Mockito.doThrow;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.spy;
 import static org.mockito.Mockito.when;
@@ -56,6 +55,7 @@ import org.apache.pulsar.common.naming.TopicName;
 import org.apache.pulsar.common.policies.data.ClusterData;
 import org.apache.pulsar.common.policies.data.TenantInfoImpl;
 import org.apache.pulsar.common.policies.data.TopicPolicies;
+import org.apache.pulsar.common.util.FutureUtil;
 import org.awaitility.Awaitility;
 import org.testng.Assert;
 import org.testng.annotations.AfterMethod;
@@ -313,7 +313,7 @@ public class SystemTopicBasedTopicPoliciesServiceTest 
extends MockedPulsarServic
         try {
             service.getTopicPoliciesAsyncWithRetry(TOPIC1, backoff, 
pulsar.getExecutor()).get();
         } catch (Exception e) {
-            assertTrue(e.getCause().getCause() instanceof 
TopicPoliciesCacheNotInitException);
+            assertTrue(e.getCause() instanceof 
TopicPoliciesCacheNotInitException);
         }
         long cost = System.currentTimeMillis() - start;
         assertTrue("actual:" + cost, cost >= 5000 - 1000);
@@ -333,9 +333,10 @@ public class SystemTopicBasedTopicPoliciesServiceTest 
extends MockedPulsarServic
 
         SystemTopicClient.Reader<PulsarEvent> reader = 
mock(SystemTopicClient.Reader.class);
         // Throw an exception first, create successfully after retrying
-        doThrow(new 
PulsarClientException("test")).doReturn(reader).when(client).newReader();
+        doReturn(FutureUtil.failedFuture(new PulsarClientException("test")))
+                
.doReturn(CompletableFuture.completedFuture(reader)).when(client).newReaderAsync();
 
-        SystemTopicClient.Reader<PulsarEvent> reader1 = 
service.creatSystemTopicClientWithRetry(null).get();
+        SystemTopicClient.Reader<PulsarEvent> reader1 = 
service.createSystemTopicClientWithRetry(null).get();
 
         assertEquals(reader1, reader);
     }
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/systopic/PartitionedSystemTopicTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/systopic/PartitionedSystemTopicTest.java
index bd4ef78..7dd02bd 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/systopic/PartitionedSystemTopicTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/systopic/PartitionedSystemTopicTest.java
@@ -18,14 +18,23 @@
  */
 package org.apache.pulsar.broker.systopic;
 
+import com.google.common.collect.Sets;
+import org.apache.commons.lang.RandomStringUtils;
 import org.apache.pulsar.broker.service.BrokerTestBase;
+import org.apache.pulsar.client.api.Consumer;
 import org.apache.pulsar.common.events.EventsTopicNames;
 import org.apache.pulsar.common.naming.NamespaceName;
+import org.apache.pulsar.common.policies.data.TenantInfoImpl;
+import org.apache.pulsar.common.util.FutureUtil;
 import org.testng.Assert;
 import org.testng.annotations.AfterMethod;
 import org.testng.annotations.BeforeMethod;
 import org.testng.annotations.Test;
 
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+
 @Test(groups = "broker")
 public class PartitionedSystemTopicTest extends BrokerTestBase {
 
@@ -66,4 +75,28 @@ public class PartitionedSystemTopicTest extends 
BrokerTestBase {
         Assert.assertEquals(partitions, PARTITIONS);
         Assert.assertEquals(admin.topics().getList(ns).size(), PARTITIONS);
     }
+
+    @Test(timeOut = 1000 * 60)
+    public void testConsumerCreationWhenEnablingTopicPolicy() throws Exception 
{
+        String tenant = "tenant-" + 
RandomStringUtils.randomAlphabetic(4).toLowerCase();
+        admin.tenants().createTenant(tenant, new 
TenantInfoImpl(Sets.newHashSet(), Sets.newHashSet("test")));
+        int namespaceCount = 30;
+        for (int i = 0; i < namespaceCount; i++) {
+            String ns = tenant + "/ns-" + i;
+            admin.namespaces().createNamespace(ns, 4);
+            String topic = ns + "/t1";
+            admin.topics().createPartitionedTopic(topic, 2);
+        }
+
+        List<CompletableFuture<Consumer<byte[]>>> futureList = new 
ArrayList<>();
+        for (int i = 0; i < namespaceCount; i++) {
+            String topic = tenant + "/ns-" + i + "/t1";
+            futureList.add(pulsarClient.newConsumer()
+                    .topic(topic)
+                    .subscriptionName("sub")
+                    .subscribeAsync());
+        }
+        FutureUtil.waitForAll(futureList).get();
+    }
+
 }
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/RetryUtilTest.java 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/RetryUtilTest.java
index e17f376..f554430 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/RetryUtilTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/RetryUtilTest.java
@@ -19,6 +19,7 @@
 package org.apache.pulsar.client.impl;
 
 import org.apache.pulsar.client.util.RetryUtil;
+import org.apache.pulsar.common.util.FutureUtil;
 import org.testng.annotations.Test;
 
 import java.util.concurrent.CompletableFuture;
@@ -45,11 +46,14 @@ public class RetryUtilTest {
                 .setMandatoryStop(5000, TimeUnit.MILLISECONDS)
                 .create();
         RetryUtil.retryAsynchronously(() -> {
+            CompletableFuture<Boolean> future = new CompletableFuture<>();
             atomicInteger.incrementAndGet();
             if (atomicInteger.get() < 5) {
-                throw new RuntimeException("fail");
+                future.completeExceptionally(new RuntimeException("fail"));
+            } else {
+                future.complete(true);
             }
-            return true;
+            return future;
         }, backoff, executor, callback);
         assertTrue(callback.get());
         assertEquals(atomicInteger.get(), 5);
@@ -66,9 +70,8 @@ public class RetryUtilTest {
                 .setMandatoryStop(5000, TimeUnit.MILLISECONDS)
                 .create();
         long start = System.currentTimeMillis();
-        RetryUtil.retryAsynchronously(() -> {
-            throw new RuntimeException("fail");
-        }, backoff, executor, callback);
+        RetryUtil.retryAsynchronously(() ->
+                FutureUtil.failedFuture(new RuntimeException("fail")), 
backoff, executor, callback);
         try {
             callback.get();
         } catch (Exception e) {
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/util/RetryUtil.java 
b/pulsar-client/src/main/java/org/apache/pulsar/client/util/RetryUtil.java
index 084a583..b3ed2c3 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/util/RetryUtil.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/util/RetryUtil.java
@@ -30,7 +30,7 @@ import java.util.function.Supplier;
 public class RetryUtil {
     private static final Logger log = LoggerFactory.getLogger(RetryUtil.class);
 
-    public static <T> void retryAsynchronously(Supplier<T> supplier, Backoff 
backoff,
+    public static <T> void retryAsynchronously(Supplier<CompletableFuture<T>> 
supplier, Backoff backoff,
                                                ScheduledExecutorService 
scheduledExecutorService,
                                                CompletableFuture<T> callback) {
         if (backoff.getMax() <= 0) {
@@ -43,26 +43,24 @@ public class RetryUtil {
                 executeWithRetry(supplier, backoff, scheduledExecutorService, 
callback));
     }
 
-    private static <T> void executeWithRetry(Supplier<T> supplier, Backoff 
backoff,
+    private static <T> void executeWithRetry(Supplier<CompletableFuture<T>> 
supplier, Backoff backoff,
                                              ScheduledExecutorService 
scheduledExecutorService,
                                              CompletableFuture<T> callback) {
-        try {
-            T result = supplier.get();
-            callback.complete(result);
-        } catch (Exception e) {
-            long next = backoff.next();
-            boolean isMandatoryStop = backoff.isMandatoryStopMade();
-            if (isMandatoryStop) {
-                callback.completeExceptionally(e);
-            } else {
-                if (log.isDebugEnabled()) {
-                    log.debug("execute with retry fail, will retry in {} ms", 
next, e);
+        supplier.get().whenComplete((result, e) -> {
+            if (e != null) {
+                long next = backoff.next();
+                boolean isMandatoryStop = backoff.isMandatoryStopMade();
+                if (isMandatoryStop) {
+                    callback.completeExceptionally(e);
+                } else {
+                    log.warn("Execution with retry fail, because of {}, will 
retry in {} ms", e.getMessage(), next);
+                    scheduledExecutorService.schedule(() ->
+                                    executeWithRetry(supplier, backoff, 
scheduledExecutorService, callback),
+                            next, TimeUnit.MILLISECONDS);
                 }
-                log.info("Because of {} , will retry in {} ms", 
e.getMessage(), next);
-                scheduledExecutorService.schedule(() ->
-                                executeWithRetry(supplier, backoff, 
scheduledExecutorService, callback),
-                        next, TimeUnit.MILLISECONDS);
+                return;
             }
-        }
+            callback.complete(result);
+        });
     }
 }

Reply via email to