lhotari commented on a change in pull request #11372:
URL: https://github.com/apache/pulsar/pull/11372#discussion_r675571542
##########
File path:
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PrecisPublishLimiter.java
##########
@@ -29,17 +30,20 @@
protected volatile long publishMaxByteRate = 0;
protected volatile boolean publishThrottlingEnabled = false;
// precise mode for publish rate limiter
- private RateLimiter topicPublishRateLimiterOnMessage;
- private RateLimiter topicPublishRateLimiterOnByte;
+ private volatile HashMap<String, RateLimiter> rateLimiters;
Review comment:
What is the reason for using a HashMap to store the RateLimiter
instances?
This is a bit problematic solution from thread safety perspective.
`volatile` on a java.util.HashMap field won't help to ensure thread safety.
##########
File path:
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PublishRateLimiterDisable.java
##########
@@ -59,7 +59,7 @@ public void update(PublishRate maxPublishRate) {
@Override
public boolean tryAcquire(int numbers, long bytes) {
// No-op
- return false;
+ return true;
Review comment:
This problem has been reported as #10603 and it's fixed by PR #11442.
##########
File path:
pulsar-common/src/main/java/org/apache/pulsar/common/util/RateLimiter.java
##########
@@ -203,7 +203,7 @@ public synchronized boolean tryAcquire(long acquirePermit) {
*
* @return returns 0 if permits is not available
*/
- public synchronized long getAvailablePermits() {
+ public long getAvailablePermits() {
Review comment:
There are a lot of methods using `synchronized` in RateLimiter
currently. Does this change address an actual problem?
The design of RateLimiter isn't optimal, but changing this method makes the
implementation inconsistent. I think it would be better to first clarify the
issue with using syncronization and redesign a RateLimiter implementation with
minimal use of blocking methods and synchronization.
##########
File path:
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PrecisPublishLimiter.java
##########
@@ -63,6 +67,14 @@ public boolean isPublishRateExceeded() {
return false;
}
+ private void releaseThrottle() {
+ for (RateLimiter rateLimiter : rateLimiters.values()) {
+ if (rateLimiter.getAvailablePermits() <= 0) {
+ return;
+ }
+ }
+ this.rateLimitFunction.apply();
+ }
Review comment:
Good catch! It seems that this logic is the "beef" of this PR and
resolves the issue.
Would it be possible to create a minimal PR with just these changes without
any change to use a dynamic hashmap to store rate limiters? Just use the
`topicPublishRateLimiterOnMessage` and `topicPublishRateLimiterOnByte` fields
directly.
Think of a good name for the method `releaseThrottle` isn't very
descriptive. Please also add comments to explain the logic.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]