lhotari commented on code in PR #21681: URL: https://github.com/apache/pulsar/pull/21681#discussion_r1421015281
########## pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PublishRateLimiterImpl.java: ########## @@ -16,70 +16,133 @@ * specific language governing permissions and limitations * under the License. */ + package org.apache.pulsar.broker.service; -import java.util.concurrent.atomic.LongAdder; +import com.google.common.annotations.VisibleForTesting; +import io.netty.channel.EventLoopGroup; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.function.LongSupplier; import org.apache.pulsar.common.policies.data.Policies; import org.apache.pulsar.common.policies.data.PublishRate; +import org.apache.pulsar.common.util.AsyncTokenBucket; +import org.jctools.queues.MessagePassingQueue; +import org.jctools.queues.MpscUnboundedArrayQueue; public class PublishRateLimiterImpl implements PublishRateLimiter { - protected volatile int publishMaxMessageRate = 0; - protected volatile long publishMaxByteRate = 0; - protected volatile boolean publishThrottlingEnabled = false; - protected volatile boolean publishRateExceeded = false; - protected volatile LongAdder currentPublishMsgCount = new LongAdder(); - protected volatile LongAdder currentPublishByteCount = new LongAdder(); + private volatile AsyncTokenBucket tokenBucketOnMessage; + private volatile AsyncTokenBucket tokenBucketOnByte; + private final LongSupplier clockSource; + + private final MessagePassingQueue<Producer> unthrottlingQueue = new MpscUnboundedArrayQueue<>(1024); + + private final AtomicBoolean unthrottlingScheduled = new AtomicBoolean(false); public PublishRateLimiterImpl(Policies policies, String clusterName) { + this(); update(policies, clusterName); } public PublishRateLimiterImpl(PublishRate maxPublishRate) { + this(); update(maxPublishRate); } + public PublishRateLimiterImpl() { + this(AsyncTokenBucket.DEFAULT_CLOCK_SOURCE); + } + + public PublishRateLimiterImpl(LongSupplier clockSource) { + this.clockSource = clockSource; + } + + /** + * {@inheritDoc} + */ @Override - public void checkPublishRate() { - if (this.publishThrottlingEnabled && !publishRateExceeded) { - if (this.publishMaxByteRate > 0) { - long currentPublishByteRate = this.currentPublishByteCount.sum(); - if (currentPublishByteRate > this.publishMaxByteRate) { - publishRateExceeded = true; - return; - } - } - - if (this.publishMaxMessageRate > 0) { - long currentPublishMsgRate = this.currentPublishMsgCount.sum(); - if (currentPublishMsgRate > this.publishMaxMessageRate) { - publishRateExceeded = true; - } - } + public void handlePublishThrottling(Producer producer, int numOfMessages, + long msgSizeInBytes) { + boolean shouldThrottle = false; + AsyncTokenBucket currentTokenBucketOnMessage = tokenBucketOnMessage; + if (currentTokenBucketOnMessage != null) { + // consume tokens from the token bucket for messages + currentTokenBucketOnMessage.consumeTokens(numOfMessages); + // check if the token bucket contains remaining tokens, if not, we should throttle + shouldThrottle = !currentTokenBucketOnMessage.containsTokens(); + } + AsyncTokenBucket currentTokenBucketOnByte = tokenBucketOnByte; + if (currentTokenBucketOnByte != null) { + // consume tokens from the token bucket for bytes + currentTokenBucketOnByte.consumeTokens(msgSizeInBytes); + // check if the token bucket contains remaining tokens, if not, we should throttle + shouldThrottle = shouldThrottle || !currentTokenBucketOnByte.containsTokens(); + } + if (shouldThrottle) { Review Comment: yes, that's possible. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org