This is an automated email from the ASF dual-hosted git repository. penghui pushed a commit to branch branch-2.9 in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit 1bea336e7dbea0069d528226b78e0fe29258b780 Author: Bharani Chadalavada <[email protected]> AuthorDate: Tue Mar 15 07:12:45 2022 -0700 [ Issue 14633] [pulsar-broker] Fix metadata store deadlock when checking BacklogQuota (#14634) (cherry picked from commit 06ed9445bf29ea30b1f094b2d0ff608cb76aa3f6) --- .../main/java/org/apache/pulsar/broker/admin/impl/BrokersBase.java | 5 +++-- .../main/java/org/apache/pulsar/broker/service/BrokerService.java | 3 ++- 2 files changed, 5 insertions(+), 3 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/BrokersBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/BrokersBase.java index cb579ec..973efa3 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/BrokersBase.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/BrokersBase.java @@ -18,6 +18,7 @@ */ package org.apache.pulsar.broker.admin.impl; +import static org.apache.bookkeeper.mledger.util.SafeRun.safeRun; import com.google.common.collect.Maps; import io.swagger.annotations.ApiOperation; import io.swagger.annotations.ApiParam; @@ -270,7 +271,7 @@ public class BrokersBase extends PulsarWebResource { @ApiResponse(code = 500, message = "Internal server error")}) public void backlogQuotaCheck(@Suspended AsyncResponse asyncResponse) { validateSuperUserAccess(); - pulsar().getBrokerService().executor().execute(()->{ + pulsar().getBrokerService().getBacklogQuotaChecker().execute(safeRun(()->{ try { pulsar().getBrokerService().monitorBacklogQuota(); asyncResponse.resume(Response.noContent().build()); @@ -278,7 +279,7 @@ public class BrokersBase extends PulsarWebResource { LOG.error("trigger backlogQuotaCheck fail", e); asyncResponse.resume(new RestException(e)); } - }); + })); } @GET diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java index d37dfb6..1c45202 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java @@ -215,6 +215,7 @@ public class BrokerService implements Closeable { private AuthorizationService authorizationService = null; private final ScheduledExecutorService statsUpdater; + @Getter private final ScheduledExecutorService backlogQuotaChecker; protected final AtomicReference<Semaphore> lookupRequestSemaphore; @@ -1687,7 +1688,7 @@ public class BrokerService implements Closeable { return this.backlogQuotaManager; } - public synchronized void monitorBacklogQuota() { + public void monitorBacklogQuota() { forEachTopic(topic -> { if (topic instanceof PersistentTopic) { PersistentTopic persistentTopic = (PersistentTopic) topic;
