This is an automated email from the ASF dual-hosted git repository. penghui pushed a commit to branch branch-2.9 in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit 2cb1681ba3e9fcacbe4214bf4c0dc6d08054a545 Author: JiangHaiting <[email protected]> AuthorDate: Wed Mar 16 11:37:36 2022 +0800 [Broker] Fix precision issue and initial value for Consumer#avgMessagesPerEntry (#14666) ### Motivation 1. Precision issue There is precision issue to use int type for `Consumer#avgMessagesPerEntry`. ``` tmpAvgMessagesPerEntry = (int) Math.floor(tmpAvgMessagesPerEntry * avgPercent + (1 - avgPercent) * totalMessages / entries.size()); ``` For example, if `tmpAvgMessagesPerEntry` = 1 and new value of `totalMessages / entries.size()` is always 5, then the `tmpAvgMessagesPerEntry` is always 1 and never increase. 2. Initial value issue. And the init value of 1000 seems confusing in consumerStats for users, and it need quite a long time to decrease if message rate is very slow. ### Modifications 1. Change type of avgMessagesPerEntry to double. 2. Change init value from 1000 to first `totalMessages / entries.size()`. (cherry picked from commit de2e6c8ce5821b945cbdd0cb2969234667b017ec) --- .../org/apache/pulsar/broker/service/Consumer.java | 28 ++++++++++++---------- .../PersistentDispatcherMultipleConsumers.java | 3 ++- .../PersistentDispatcherSingleActiveConsumer.java | 2 +- 3 files changed, 19 insertions(+), 14 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java index bf906a3..591b71c 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java @@ -21,6 +21,7 @@ package org.apache.pulsar.broker.service; import static com.google.common.base.Preconditions.checkArgument; import com.google.common.base.MoreObjects; import com.google.common.collect.Lists; +import com.google.common.util.concurrent.AtomicDouble; import io.netty.util.concurrent.Future; import io.netty.util.concurrent.Promise; import java.util.ArrayList; @@ -116,12 +117,10 @@ public class Consumer { /** * It starts keep tracking the average messages per entry. - * The initial value is 1000, when new value comes, it will update with + * The initial value is 0, when new value comes, it will update with * avgMessagesPerEntry = avgMessagePerEntry * avgPercent + (1 - avgPercent) * new Value. */ - private static final AtomicIntegerFieldUpdater<Consumer> AVG_MESSAGES_PER_ENTRY = - AtomicIntegerFieldUpdater.newUpdater(Consumer.class, "avgMessagesPerEntry"); - private volatile int avgMessagesPerEntry = 1000; + private final AtomicDouble avgMessagesPerEntry = new AtomicDouble(0); private static final long [] EMPTY_ACK_SET = new long[0]; private static final double avgPercent = 0.9; @@ -163,7 +162,6 @@ public class Consumer { PERMITS_RECEIVED_WHILE_CONSUMER_BLOCKED_UPDATER.set(this, 0); MESSAGE_PERMITS_UPDATER.set(this, 0); UNACKED_MESSAGES_UPDATER.set(this, 0); - AVG_MESSAGES_PER_ENTRY.set(this, 1000); this.metadata = metadata != null ? metadata : Collections.emptyMap(); @@ -265,10 +263,13 @@ public class Consumer { } // calculate avg message per entry - int tmpAvgMessagesPerEntry = AVG_MESSAGES_PER_ENTRY.get(this); - tmpAvgMessagesPerEntry = (int) Math.floor(tmpAvgMessagesPerEntry * avgPercent - + (1 - avgPercent) * totalMessages / entries.size()); - AVG_MESSAGES_PER_ENTRY.set(this, tmpAvgMessagesPerEntry); + if (avgMessagesPerEntry.get() < 1) { //valid avgMessagesPerEntry should always >= 1 + // set init value. + avgMessagesPerEntry.set(1.0 * totalMessages / entries.size()); + } else { + avgMessagesPerEntry.set(avgMessagesPerEntry.get() * avgPercent + + (1 - avgPercent) * totalMessages / entries.size()); + } // reduce permit and increment unackedMsg count with total number of messages in batch-msgs int ackedCount = batchIndexesAcks == null ? 0 : batchIndexesAcks.getTotalAckedIndexCount(); @@ -276,7 +277,7 @@ public class Consumer { if (log.isDebugEnabled()){ log.debug("[{}-{}] Added {} minus {} messages to MESSAGE_PERMITS_UPDATER in broker.service.Consumer" + " for consumerId: {}; avgMessagesPerEntry is {}", - topicName, subscription, ackedCount, totalMessages, consumerId, tmpAvgMessagesPerEntry); + topicName, subscription, ackedCount, totalMessages, consumerId, avgMessagesPerEntry.get()); } incrementUnackedMessages(unackedMessages); msgOut.recordMultipleEvents(totalMessages, totalBytes); @@ -683,8 +684,11 @@ public class Consumer { return MESSAGE_PERMITS_UPDATER.get(this); } + /** + * return 0 if there is no entry dispatched yet. + */ public int getAvgMessagesPerEntry() { - return AVG_MESSAGES_PER_ENTRY.get(this); + return (int) Math.round(avgMessagesPerEntry.get()); } public boolean isBlocked() { @@ -729,7 +733,7 @@ public class Consumer { } unackedMessages = consumerStats.unackedMessages; blockedConsumerOnUnackedMsgs = consumerStats.blockedConsumerOnUnackedMsgs; - AVG_MESSAGES_PER_ENTRY.set(this, consumerStats.avgMessagesPerEntry); + avgMessagesPerEntry.set(consumerStats.avgMessagesPerEntry); } public ConsumerStatsImpl getStats() { diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java index 0d06005..a8a36cd 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java @@ -290,8 +290,9 @@ public class PersistentDispatcherMultipleConsumers extends AbstractDispatcherMul Consumer c = getRandomConsumer(); // if turn on precise dispatcher flow control, adjust the record to read if (c != null && c.isPreciseDispatcherFlowControl()) { + int avgMessagesPerEntry = Math.max(1, c.getAvgMessagesPerEntry()); messagesToRead = Math.min( - (int) Math.ceil(currentTotalAvailablePermits * 1.0 / c.getAvgMessagesPerEntry()), + (int) Math.ceil(currentTotalAvailablePermits * 1.0 / avgMessagesPerEntry), readBatchSize); } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherSingleActiveConsumer.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherSingleActiveConsumer.java index 6161847..6900a54 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherSingleActiveConsumer.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherSingleActiveConsumer.java @@ -369,7 +369,7 @@ public class PersistentDispatcherSingleActiveConsumer extends AbstractDispatcher long bytesToRead = serviceConfig.getDispatcherMaxReadSizeBytes(); // if turn of precise dispatcher flow control, adjust the records to read if (consumer.isPreciseDispatcherFlowControl()) { - int avgMessagesPerEntry = consumer.getAvgMessagesPerEntry(); + int avgMessagesPerEntry = Math.max(1, consumer.getAvgMessagesPerEntry()); messagesToRead = Math.min((int) Math.ceil(availablePermits * 1.0 / avgMessagesPerEntry), readBatchSize); }
