This is an automated email from the ASF dual-hosted git repository. penghui pushed a commit to branch branch-2.8 in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit f33115b7e60cf4d9e2bc4084d928b73cc970c4c1 Author: Bharani Chadalavada <[email protected]> AuthorDate: Tue Mar 15 07:12:45 2022 -0700 [ Issue 14633] [pulsar-broker] Fix metadata store deadlock when checking BacklogQuota (#14634) (cherry picked from commit 06ed9445bf29ea30b1f094b2d0ff608cb76aa3f6) --- .../main/java/org/apache/pulsar/broker/admin/impl/BrokersBase.java | 5 +++-- .../main/java/org/apache/pulsar/broker/service/BrokerService.java | 3 ++- 2 files changed, 5 insertions(+), 3 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/BrokersBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/BrokersBase.java index 15303b1..e43336b 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/BrokersBase.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/BrokersBase.java @@ -18,6 +18,7 @@ */ package org.apache.pulsar.broker.admin.impl; +import static org.apache.bookkeeper.mledger.util.SafeRun.safeRun; import static org.apache.pulsar.broker.service.BrokerService.BROKER_SERVICE_CONFIGURATION_PATH; import com.google.common.collect.Maps; import io.swagger.annotations.ApiOperation; @@ -271,7 +272,7 @@ public class BrokersBase extends PulsarWebResource { @ApiResponse(code = 500, message = "Internal server error")}) public void backlogQuotaCheck(@Suspended AsyncResponse asyncResponse) { validateSuperUserAccess(); - pulsar().getBrokerService().executor().execute(()->{ + pulsar().getBrokerService().getBacklogQuotaChecker().execute(safeRun(()->{ try { pulsar().getBrokerService().monitorBacklogQuota(); asyncResponse.resume(Response.noContent().build()); @@ -279,7 +280,7 @@ public class BrokersBase extends PulsarWebResource { LOG.error("trigger backlogQuotaCheck fail", e); asyncResponse.resume(new RestException(e)); } - }); + })); } @GET diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java index a9d417c..0003121 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java @@ -221,6 +221,7 @@ public class BrokerService implements Closeable, ZooKeeperCacheListener<Policies private AuthorizationService authorizationService = null; private final ScheduledExecutorService statsUpdater; + @Getter private final ScheduledExecutorService backlogQuotaChecker; protected final AtomicReference<Semaphore> lookupRequestSemaphore; @@ -1691,7 +1692,7 @@ public class BrokerService implements Closeable, ZooKeeperCacheListener<Policies return this.backlogQuotaManager; } - public synchronized void monitorBacklogQuota() { + public void monitorBacklogQuota() { forEachTopic(topic -> { if (topic instanceof PersistentTopic) { PersistentTopic persistentTopic = (PersistentTopic) topic;
