This is an automated email from the ASF dual-hosted git repository. yong pushed a commit to branch branch-2.8 in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit c1e176e7db6fbda83970b3cde3b104b37c8f14c3 Author: Yong Zhang <[email protected]> AuthorDate: Tue Aug 9 10:57:20 2022 +0800 [improve][broker]Remove unnecessary lock on the stats thread (#16983) --- *Motivation* We found there has a block between the pulsar-ordered executor and the pulsar-stats-updater executor. The pulsar-ordered executor is trying to createManagedLedgerOffloader, and the pulsar-stats-updater is getting the compactor. Both them want to get the lock. We have an improvement about the `createManagedLedgerOffloader` before. https://github.com/apache/pulsar/pull/15883 We are using `getCompactor(false)` for the stats related operations. The `getCompactor` is guarded by `synchronized`. Actually, the stats just want to get the current compactor without initializing it. We don't need to use `synchronized` to guard this operation. *Modification* Remove unnecessary `synchronized` on the `getCompactor` method. (cherry picked from commit 4d5ecba9394515e7dbf19fd01739c1e1dc90e5ec) --- .../main/java/org/apache/pulsar/broker/PulsarService.java | 13 ++++++++----- .../org/apache/pulsar/broker/service/BrokerService.java | 9 +++------ .../pulsar/broker/service/persistent/PersistentTopic.java | 7 +------ .../broker/stats/prometheus/NamespaceStatsAggregator.java | 8 +------- .../apache/pulsar/broker/stats/PrometheusMetricsTest.java | 2 +- 5 files changed, 14 insertions(+), 25 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java index 760cc83b5d6..459b1d9dc67 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java @@ -1329,16 +1329,19 @@ public class PulsarService implements AutoCloseable { } public synchronized Compactor getCompactor() throws PulsarServerException { - return getCompactor(true); - } - - public synchronized Compactor getCompactor(boolean shouldInitialize) throws PulsarServerException { - if (this.compactor == null && shouldInitialize) { + if (this.compactor == null) { this.compactor = newCompactor(); } return this.compactor; } + // This method is used for metrics, which is allowed to as null + // Because it's no operation on the compactor, so let's remove the synchronized on this method + // to avoid unnecessary lock competition. + public Compactor getNullableCompactor() { + return this.compactor; + } + protected synchronized OrderedScheduler getOffloaderScheduler(OffloadPoliciesImpl offloadPolicies) { if (this.offloaderScheduler == null) { this.offloaderScheduler = OrderedScheduler.newSchedulerBuilder() diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java index e91e462f7df..34b126d6104 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java @@ -1877,12 +1877,9 @@ public class BrokerService implements Closeable, ZooKeeperCacheListener<Policies } topics.remove(topic); - try { - Compactor compactor = pulsar.getCompactor(false); - if (compactor != null) { - compactor.getStats().removeTopic(topic); - } - } catch (PulsarServerException ignore) { + Compactor compactor = pulsar.getNullableCompactor(); + if (compactor != null) { + compactor.getStats().removeTopic(topic); } } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java index 19706585531..f8ea56e2c2b 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java @@ -1967,12 +1967,7 @@ public class PersistentTopic extends AbstractTopic } private Optional<CompactorMXBean> getCompactorMXBean() { - Compactor compactor = null; - try { - compactor = brokerService.pulsar().getCompactor(false); - } catch (PulsarServerException ex) { - log.warn("get compactor error", ex); - } + Compactor compactor = brokerService.pulsar().getNullableCompactor(); return Optional.ofNullable(compactor).map(c -> c.getStats()); } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/NamespaceStatsAggregator.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/NamespaceStatsAggregator.java index 2a21a0b402a..945725f4b77 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/NamespaceStatsAggregator.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/NamespaceStatsAggregator.java @@ -25,7 +25,6 @@ import lombok.extern.slf4j.Slf4j; import org.apache.bookkeeper.client.LedgerHandle; import org.apache.bookkeeper.mledger.ManagedLedger; import org.apache.bookkeeper.mledger.impl.ManagedLedgerMBeanImpl; -import org.apache.pulsar.broker.PulsarServerException; import org.apache.pulsar.broker.PulsarService; import org.apache.pulsar.broker.service.Topic; import org.apache.pulsar.broker.service.persistent.PersistentTopic; @@ -100,12 +99,7 @@ public class NamespaceStatsAggregator { } private static Optional<CompactorMXBean> getCompactorMXBean(PulsarService pulsar) { - Compactor compactor = null; - try { - compactor = pulsar.getCompactor(false); - } catch (PulsarServerException e) { - log.error("get compactor error", e); - } + Compactor compactor = pulsar.getNullableCompactor(); return Optional.ofNullable(compactor).map(c -> c.getStats()); } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/PrometheusMetricsTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/PrometheusMetricsTest.java index c0383cd6e9f..df765d65295 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/PrometheusMetricsTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/PrometheusMetricsTest.java @@ -1220,7 +1220,7 @@ public class PrometheusMetricsTest extends BrokerTestBase { } ScheduledExecutorService compactionScheduler = Executors.newSingleThreadScheduledExecutor( new ThreadFactoryBuilder().setNameFormat("compactor").setDaemon(true).build()); - Compactor compactor = pulsar.getCompactor(true); + Compactor compactor = pulsar.getCompactor(); compactor.compact(topicName).get(); statsOut = new ByteArrayOutputStream(); PrometheusMetricsGenerator.generate(pulsar, true, false, false, statsOut);
