This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch branch-2.9
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 2cb1681ba3e9fcacbe4214bf4c0dc6d08054a545
Author: JiangHaiting <[email protected]>
AuthorDate: Wed Mar 16 11:37:36 2022 +0800

    [Broker] Fix precision issue and initial value for 
Consumer#avgMessagesPerEntry (#14666)
    
    ### Motivation
    
    1. Precision issue
    There is precision issue to use int type for `Consumer#avgMessagesPerEntry`.
    ```
    tmpAvgMessagesPerEntry = (int) Math.floor(tmpAvgMessagesPerEntry * 
avgPercent
                    + (1 - avgPercent) * totalMessages / entries.size());
    ```
    
    For example, if `tmpAvgMessagesPerEntry` = 1 and new value of  
`totalMessages / entries.size()` is always 5,  then the 
`tmpAvgMessagesPerEntry` is always 1 and never increase.
    
    2.  Initial value issue.
    And the init value of 1000 seems confusing in consumerStats for users, and 
it need quite a long time to decrease if message rate is very slow.
    
    ### Modifications
    
    1. Change type of avgMessagesPerEntry to double.
    2. Change init value from 1000 to first `totalMessages / entries.size()`.
    
    (cherry picked from commit de2e6c8ce5821b945cbdd0cb2969234667b017ec)
---
 .../org/apache/pulsar/broker/service/Consumer.java | 28 ++++++++++++----------
 .../PersistentDispatcherMultipleConsumers.java     |  3 ++-
 .../PersistentDispatcherSingleActiveConsumer.java  |  2 +-
 3 files changed, 19 insertions(+), 14 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java
index bf906a3..591b71c 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java
@@ -21,6 +21,7 @@ package org.apache.pulsar.broker.service;
 import static com.google.common.base.Preconditions.checkArgument;
 import com.google.common.base.MoreObjects;
 import com.google.common.collect.Lists;
+import com.google.common.util.concurrent.AtomicDouble;
 import io.netty.util.concurrent.Future;
 import io.netty.util.concurrent.Promise;
 import java.util.ArrayList;
@@ -116,12 +117,10 @@ public class Consumer {
 
     /**
      * It starts keep tracking the average messages per entry.
-     * The initial value is 1000, when new value comes, it will update with
+     * The initial value is 0, when new value comes, it will update with
      * avgMessagesPerEntry = avgMessagePerEntry * avgPercent + (1 - 
avgPercent) * new Value.
      */
-    private static final AtomicIntegerFieldUpdater<Consumer> 
AVG_MESSAGES_PER_ENTRY =
-            AtomicIntegerFieldUpdater.newUpdater(Consumer.class, 
"avgMessagesPerEntry");
-    private volatile int avgMessagesPerEntry = 1000;
+    private final AtomicDouble avgMessagesPerEntry = new AtomicDouble(0);
     private static final long [] EMPTY_ACK_SET = new long[0];
 
     private static final double avgPercent = 0.9;
@@ -163,7 +162,6 @@ public class Consumer {
         PERMITS_RECEIVED_WHILE_CONSUMER_BLOCKED_UPDATER.set(this, 0);
         MESSAGE_PERMITS_UPDATER.set(this, 0);
         UNACKED_MESSAGES_UPDATER.set(this, 0);
-        AVG_MESSAGES_PER_ENTRY.set(this, 1000);
 
         this.metadata = metadata != null ? metadata : Collections.emptyMap();
 
@@ -265,10 +263,13 @@ public class Consumer {
         }
 
         // calculate avg message per entry
-        int tmpAvgMessagesPerEntry = AVG_MESSAGES_PER_ENTRY.get(this);
-        tmpAvgMessagesPerEntry = (int) Math.floor(tmpAvgMessagesPerEntry * 
avgPercent
-                + (1 - avgPercent) * totalMessages / entries.size());
-        AVG_MESSAGES_PER_ENTRY.set(this, tmpAvgMessagesPerEntry);
+        if (avgMessagesPerEntry.get() < 1) { //valid avgMessagesPerEntry 
should always >= 1
+            // set init value.
+            avgMessagesPerEntry.set(1.0 * totalMessages / entries.size());
+        } else {
+            avgMessagesPerEntry.set(avgMessagesPerEntry.get() * avgPercent
+                    + (1 - avgPercent) * totalMessages / entries.size());
+        }
 
         // reduce permit and increment unackedMsg count with total number of 
messages in batch-msgs
         int ackedCount = batchIndexesAcks == null ? 0 : 
batchIndexesAcks.getTotalAckedIndexCount();
@@ -276,7 +277,7 @@ public class Consumer {
         if (log.isDebugEnabled()){
             log.debug("[{}-{}] Added {} minus {} messages to 
MESSAGE_PERMITS_UPDATER in broker.service.Consumer"
                             + " for consumerId: {}; avgMessagesPerEntry is {}",
-                   topicName, subscription, ackedCount, totalMessages, 
consumerId, tmpAvgMessagesPerEntry);
+                   topicName, subscription, ackedCount, totalMessages, 
consumerId, avgMessagesPerEntry.get());
         }
         incrementUnackedMessages(unackedMessages);
         msgOut.recordMultipleEvents(totalMessages, totalBytes);
@@ -683,8 +684,11 @@ public class Consumer {
         return MESSAGE_PERMITS_UPDATER.get(this);
     }
 
+    /**
+     * return 0 if there is no entry dispatched yet.
+     */
     public int getAvgMessagesPerEntry() {
-        return AVG_MESSAGES_PER_ENTRY.get(this);
+        return (int) Math.round(avgMessagesPerEntry.get());
     }
 
     public boolean isBlocked() {
@@ -729,7 +733,7 @@ public class Consumer {
         }
         unackedMessages = consumerStats.unackedMessages;
         blockedConsumerOnUnackedMsgs = 
consumerStats.blockedConsumerOnUnackedMsgs;
-        AVG_MESSAGES_PER_ENTRY.set(this, consumerStats.avgMessagesPerEntry);
+        avgMessagesPerEntry.set(consumerStats.avgMessagesPerEntry);
     }
 
     public ConsumerStatsImpl getStats() {
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java
index 0d06005..a8a36cd 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java
@@ -290,8 +290,9 @@ public class PersistentDispatcherMultipleConsumers extends 
AbstractDispatcherMul
         Consumer c = getRandomConsumer();
         // if turn on precise dispatcher flow control, adjust the record to 
read
         if (c != null && c.isPreciseDispatcherFlowControl()) {
+            int avgMessagesPerEntry = Math.max(1, c.getAvgMessagesPerEntry());
             messagesToRead = Math.min(
-                    (int) Math.ceil(currentTotalAvailablePermits * 1.0 / 
c.getAvgMessagesPerEntry()),
+                    (int) Math.ceil(currentTotalAvailablePermits * 1.0 / 
avgMessagesPerEntry),
                     readBatchSize);
         }
 
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherSingleActiveConsumer.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherSingleActiveConsumer.java
index 6161847..6900a54 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherSingleActiveConsumer.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherSingleActiveConsumer.java
@@ -369,7 +369,7 @@ public class PersistentDispatcherSingleActiveConsumer 
extends AbstractDispatcher
         long bytesToRead = serviceConfig.getDispatcherMaxReadSizeBytes();
         // if turn of precise dispatcher flow control, adjust the records to 
read
         if (consumer.isPreciseDispatcherFlowControl()) {
-            int avgMessagesPerEntry = consumer.getAvgMessagesPerEntry();
+            int avgMessagesPerEntry = Math.max(1, 
consumer.getAvgMessagesPerEntry());
             messagesToRead = Math.min((int) Math.ceil(availablePermits * 1.0 / 
avgMessagesPerEntry), readBatchSize);
         }
 

Reply via email to