This is an automated email from the ASF dual-hosted git repository.
penghui pushed a commit to branch branch-2.8
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/branch-2.8 by this push:
new 483b147 [Broker] Avoid thread deadlock problem when creating topic
policy reader (#13837)
483b147 is described below
commit 483b1471285d00a983c3dbfbc1488e709485d55b
Author: ran <[email protected]>
AuthorDate: Thu Jan 20 20:50:53 2022 +0800
[Broker] Avoid thread deadlock problem when creating topic policy reader
(#13837)
Currently, the topic policy reader creation thread is the `executor` of the
`PulsarService`, this thread is also used to search the candidate broker. If
they use the same thread in some conditions, the lookup request will be blocked
and result in a lookup request timeout.


We could find out that the lookup request was blocked 1 minute until the
lookup request timeout. The thread `pulsar-2-8` was blocked by topic policy
reader creation.
Change the topic policy reader creation to be asynchronous. Modify the
method `RetryUtil.retryAsynchronously` to handle asynchronous execution.
Add a new test to verify consumer creations can be successful when enabling
the topic policy feature.
(cherry picked from commit 760bfec51ed6f043024c3c42bc73639fc2c117bb)
---
.../SystemTopicBasedTopicPoliciesService.java | 12 ++------
.../broker/service/TopicPoliciesService.java | 6 ++--
.../SystemTopicBasedTopicPoliciesServiceTest.java | 9 +++---
.../systopic/PartitionedSystemTopicTest.java | 33 +++++++++++++++++++++
.../apache/pulsar/client/impl/RetryUtilTest.java | 13 +++++----
.../org/apache/pulsar/client/util/RetryUtil.java | 34 ++++++++++------------
6 files changed, 69 insertions(+), 38 deletions(-)
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesService.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesService.java
index 0b305ac..c91f66a 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesService.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesService.java
@@ -226,7 +226,7 @@ public class SystemTopicBasedTopicPoliciesService
implements TopicPoliciesServic
private void prepareInitPoliciesCache(NamespaceName namespace,
CompletableFuture<Void> result) {
if (policyCacheInitMap.putIfAbsent(namespace, false) == null) {
CompletableFuture<SystemTopicClient.Reader<PulsarEvent>>
readerCompletableFuture =
- creatSystemTopicClientWithRetry(namespace);
+ createSystemTopicClientWithRetry(namespace);
readerCaches.put(namespace, readerCompletableFuture);
readerCompletableFuture.whenComplete((reader, ex) -> {
if (ex != null) {
@@ -242,7 +242,7 @@ public class SystemTopicBasedTopicPoliciesService
implements TopicPoliciesServic
}
}
- protected CompletableFuture<SystemTopicClient.Reader<PulsarEvent>>
creatSystemTopicClientWithRetry(
+ protected CompletableFuture<SystemTopicClient.Reader<PulsarEvent>>
createSystemTopicClientWithRetry(
NamespaceName namespace) {
CompletableFuture<SystemTopicClient.Reader<PulsarEvent>> result = new
CompletableFuture<>();
try {
@@ -254,13 +254,7 @@ public class SystemTopicBasedTopicPoliciesService
implements TopicPoliciesServic
SystemTopicClient<PulsarEvent> systemTopicClient =
namespaceEventsSystemTopicFactory
.createTopicPoliciesSystemTopicClient(namespace);
Backoff backoff = new Backoff(1, TimeUnit.SECONDS, 3,
TimeUnit.SECONDS, 10, TimeUnit.SECONDS);
- RetryUtil.retryAsynchronously(() -> {
- try {
- return systemTopicClient.newReader();
- } catch (PulsarClientException e) {
- throw new RuntimeException(e);
- }
- }, backoff, pulsarService.getExecutor(), result);
+ RetryUtil.retryAsynchronously(systemTopicClient::newReaderAsync,
backoff, pulsarService.getExecutor(), result);
return result;
}
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/TopicPoliciesService.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/TopicPoliciesService.java
index 48d2f1e..b3b1038 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/TopicPoliciesService.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/TopicPoliciesService.java
@@ -78,11 +78,13 @@ public interface TopicPoliciesService {
.create() : backoff;
try {
RetryUtil.retryAsynchronously(() -> {
+ CompletableFuture<Optional<TopicPolicies>> future = new
CompletableFuture<>();
try {
- return Optional.ofNullable(getTopicPolicies(topicName));
+
future.complete(Optional.ofNullable(getTopicPolicies(topicName)));
} catch
(BrokerServiceException.TopicPoliciesCacheNotInitException exception) {
- throw new RuntimeException(exception);
+ future.completeExceptionally(exception);
}
+ return future;
}, usedBackoff, scheduledExecutorService, response);
} catch (Exception e) {
response.completeExceptionally(e);
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesServiceTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesServiceTest.java
index 9a489cf..8b5f420 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesServiceTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesServiceTest.java
@@ -20,7 +20,6 @@ package org.apache.pulsar.broker.service;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.doReturn;
-import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.when;
@@ -56,6 +55,7 @@ import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.policies.data.ClusterData;
import org.apache.pulsar.common.policies.data.TenantInfoImpl;
import org.apache.pulsar.common.policies.data.TopicPolicies;
+import org.apache.pulsar.common.util.FutureUtil;
import org.awaitility.Awaitility;
import org.testng.Assert;
import org.testng.annotations.AfterMethod;
@@ -313,7 +313,7 @@ public class SystemTopicBasedTopicPoliciesServiceTest
extends MockedPulsarServic
try {
service.getTopicPoliciesAsyncWithRetry(TOPIC1, backoff,
pulsar.getExecutor()).get();
} catch (Exception e) {
- assertTrue(e.getCause().getCause() instanceof
TopicPoliciesCacheNotInitException);
+ assertTrue(e.getCause() instanceof
TopicPoliciesCacheNotInitException);
}
long cost = System.currentTimeMillis() - start;
assertTrue("actual:" + cost, cost >= 5000 - 1000);
@@ -333,9 +333,10 @@ public class SystemTopicBasedTopicPoliciesServiceTest
extends MockedPulsarServic
SystemTopicClient.Reader<PulsarEvent> reader =
mock(SystemTopicClient.Reader.class);
// Throw an exception first, create successfully after retrying
- doThrow(new
PulsarClientException("test")).doReturn(reader).when(client).newReader();
+ doReturn(FutureUtil.failedFuture(new PulsarClientException("test")))
+
.doReturn(CompletableFuture.completedFuture(reader)).when(client).newReaderAsync();
- SystemTopicClient.Reader<PulsarEvent> reader1 =
service.creatSystemTopicClientWithRetry(null).get();
+ SystemTopicClient.Reader<PulsarEvent> reader1 =
service.createSystemTopicClientWithRetry(null).get();
assertEquals(reader1, reader);
}
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/systopic/PartitionedSystemTopicTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/systopic/PartitionedSystemTopicTest.java
index bd4ef78..7dd02bd 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/systopic/PartitionedSystemTopicTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/systopic/PartitionedSystemTopicTest.java
@@ -18,14 +18,23 @@
*/
package org.apache.pulsar.broker.systopic;
+import com.google.common.collect.Sets;
+import org.apache.commons.lang.RandomStringUtils;
import org.apache.pulsar.broker.service.BrokerTestBase;
+import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.common.events.EventsTopicNames;
import org.apache.pulsar.common.naming.NamespaceName;
+import org.apache.pulsar.common.policies.data.TenantInfoImpl;
+import org.apache.pulsar.common.util.FutureUtil;
import org.testng.Assert;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+
@Test(groups = "broker")
public class PartitionedSystemTopicTest extends BrokerTestBase {
@@ -66,4 +75,28 @@ public class PartitionedSystemTopicTest extends
BrokerTestBase {
Assert.assertEquals(partitions, PARTITIONS);
Assert.assertEquals(admin.topics().getList(ns).size(), PARTITIONS);
}
+
+ @Test(timeOut = 1000 * 60)
+ public void testConsumerCreationWhenEnablingTopicPolicy() throws Exception
{
+ String tenant = "tenant-" +
RandomStringUtils.randomAlphabetic(4).toLowerCase();
+ admin.tenants().createTenant(tenant, new
TenantInfoImpl(Sets.newHashSet(), Sets.newHashSet("test")));
+ int namespaceCount = 30;
+ for (int i = 0; i < namespaceCount; i++) {
+ String ns = tenant + "/ns-" + i;
+ admin.namespaces().createNamespace(ns, 4);
+ String topic = ns + "/t1";
+ admin.topics().createPartitionedTopic(topic, 2);
+ }
+
+ List<CompletableFuture<Consumer<byte[]>>> futureList = new
ArrayList<>();
+ for (int i = 0; i < namespaceCount; i++) {
+ String topic = tenant + "/ns-" + i + "/t1";
+ futureList.add(pulsarClient.newConsumer()
+ .topic(topic)
+ .subscriptionName("sub")
+ .subscribeAsync());
+ }
+ FutureUtil.waitForAll(futureList).get();
+ }
+
}
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/RetryUtilTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/RetryUtilTest.java
index e17f376..f554430 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/RetryUtilTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/RetryUtilTest.java
@@ -19,6 +19,7 @@
package org.apache.pulsar.client.impl;
import org.apache.pulsar.client.util.RetryUtil;
+import org.apache.pulsar.common.util.FutureUtil;
import org.testng.annotations.Test;
import java.util.concurrent.CompletableFuture;
@@ -45,11 +46,14 @@ public class RetryUtilTest {
.setMandatoryStop(5000, TimeUnit.MILLISECONDS)
.create();
RetryUtil.retryAsynchronously(() -> {
+ CompletableFuture<Boolean> future = new CompletableFuture<>();
atomicInteger.incrementAndGet();
if (atomicInteger.get() < 5) {
- throw new RuntimeException("fail");
+ future.completeExceptionally(new RuntimeException("fail"));
+ } else {
+ future.complete(true);
}
- return true;
+ return future;
}, backoff, executor, callback);
assertTrue(callback.get());
assertEquals(atomicInteger.get(), 5);
@@ -66,9 +70,8 @@ public class RetryUtilTest {
.setMandatoryStop(5000, TimeUnit.MILLISECONDS)
.create();
long start = System.currentTimeMillis();
- RetryUtil.retryAsynchronously(() -> {
- throw new RuntimeException("fail");
- }, backoff, executor, callback);
+ RetryUtil.retryAsynchronously(() ->
+ FutureUtil.failedFuture(new RuntimeException("fail")),
backoff, executor, callback);
try {
callback.get();
} catch (Exception e) {
diff --git
a/pulsar-client/src/main/java/org/apache/pulsar/client/util/RetryUtil.java
b/pulsar-client/src/main/java/org/apache/pulsar/client/util/RetryUtil.java
index 084a583..b3ed2c3 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/util/RetryUtil.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/util/RetryUtil.java
@@ -30,7 +30,7 @@ import java.util.function.Supplier;
public class RetryUtil {
private static final Logger log = LoggerFactory.getLogger(RetryUtil.class);
- public static <T> void retryAsynchronously(Supplier<T> supplier, Backoff
backoff,
+ public static <T> void retryAsynchronously(Supplier<CompletableFuture<T>>
supplier, Backoff backoff,
ScheduledExecutorService
scheduledExecutorService,
CompletableFuture<T> callback) {
if (backoff.getMax() <= 0) {
@@ -43,26 +43,24 @@ public class RetryUtil {
executeWithRetry(supplier, backoff, scheduledExecutorService,
callback));
}
- private static <T> void executeWithRetry(Supplier<T> supplier, Backoff
backoff,
+ private static <T> void executeWithRetry(Supplier<CompletableFuture<T>>
supplier, Backoff backoff,
ScheduledExecutorService
scheduledExecutorService,
CompletableFuture<T> callback) {
- try {
- T result = supplier.get();
- callback.complete(result);
- } catch (Exception e) {
- long next = backoff.next();
- boolean isMandatoryStop = backoff.isMandatoryStopMade();
- if (isMandatoryStop) {
- callback.completeExceptionally(e);
- } else {
- if (log.isDebugEnabled()) {
- log.debug("execute with retry fail, will retry in {} ms",
next, e);
+ supplier.get().whenComplete((result, e) -> {
+ if (e != null) {
+ long next = backoff.next();
+ boolean isMandatoryStop = backoff.isMandatoryStopMade();
+ if (isMandatoryStop) {
+ callback.completeExceptionally(e);
+ } else {
+ log.warn("Execution with retry fail, because of {}, will
retry in {} ms", e.getMessage(), next);
+ scheduledExecutorService.schedule(() ->
+ executeWithRetry(supplier, backoff,
scheduledExecutorService, callback),
+ next, TimeUnit.MILLISECONDS);
}
- log.info("Because of {} , will retry in {} ms",
e.getMessage(), next);
- scheduledExecutorService.schedule(() ->
- executeWithRetry(supplier, backoff,
scheduledExecutorService, callback),
- next, TimeUnit.MILLISECONDS);
+ return;
}
- }
+ callback.complete(result);
+ });
}
}