This is an automated email from the ASF dual-hosted git repository.
mattisonchao pushed a commit to branch branch-2.9
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/branch-2.9 by this push:
new 7dcca2aaf8d Use dispatchRateLimiterLock to update dispatchRateLimiter.
(#15601)
7dcca2aaf8d is described below
commit 7dcca2aaf8d8cf78544f9373687504a9c0190a11
Author: Jiwei Guo <[email protected]>
AuthorDate: Mon May 16 10:07:47 2022 +0800
Use dispatchRateLimiterLock to update dispatchRateLimiter. (#15601)
### Motivation
https://github.com/apache/pulsar/blob/58c82a71beb7506e422def391af532945be2b7a7/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java#L377-L399
The object lock may change when execute at line-382, and cause the lock to
become useless.
So use `dispatchRateLimiterLock` to synchronize.
(cherry picked from commit ff4e6000f2d58eff930178ae0c02ef9c5fffb47c)
---
.../org/apache/pulsar/broker/service/persistent/PersistentTopic.java | 5 +++--
1 file changed, 3 insertions(+), 2 deletions(-)
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
index 6b1063ec49a..5505c940af6 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
@@ -184,6 +184,7 @@ public class PersistentTopic extends AbstractTopic
public boolean msgChunkPublished;
private Optional<DispatchRateLimiter> dispatchRateLimiter =
Optional.empty();
+ private final Object dispatchRateLimiterLock = new Object();
private Optional<SubscribeRateLimiter> subscribeRateLimiter =
Optional.empty();
public volatile long delayedDeliveryTickTimeMillis = 1000;
private final long backloggedCursorThresholdEntries;
@@ -376,7 +377,7 @@ public class PersistentTopic extends AbstractTopic
}
private void initializeRateLimiterIfNeeded(Optional<Policies> policies) {
- synchronized (dispatchRateLimiter) {
+ synchronized (dispatchRateLimiterLock) {
// dispatch rate limiter for topic
if (!dispatchRateLimiter.isPresent() && DispatchRateLimiter
.isDispatchRateNeeded(brokerService, policies, topic,
Type.TOPIC)) {
@@ -3203,7 +3204,7 @@ public class PersistentTopic extends AbstractTopic
}
private void initializeTopicDispatchRateLimiterIfNeeded(TopicPolicies
policies) {
- synchronized (dispatchRateLimiter) {
+ synchronized (dispatchRateLimiterLock) {
if (!dispatchRateLimiter.isPresent() && policies.getDispatchRate()
!= null) {
this.dispatchRateLimiter = Optional.of(new
DispatchRateLimiter(this, Type.TOPIC));
}