This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch branch-2.10
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit fd5bc9d688fa1c9f3f918626b3e12f896de511be
Author: Jiwei Guo <[email protected]>
AuthorDate: Thu Mar 10 21:46:35 2022 +0800

    Add log to track negtive unacked msg. (#14501)
    
    (cherry picked from commit 82bbb2c03a4ae14c2bccab183d665ad3248d4184)
---
 .../src/main/java/org/apache/pulsar/broker/service/Consumer.java    | 6 ++++++
 1 file changed, 6 insertions(+)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java
index 3cf4cac..c30b991 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java
@@ -138,6 +138,8 @@ public class Consumer {
     @Setter
     private volatile long consumerEpoch;
 
+    private long negtiveUnackedMsgsTimestamp;
+
     public Consumer(Subscription subscription, SubType subType, String 
topicName, long consumerId,
                     int priorityLevel, String consumerName,
                     boolean isDurable, TransportCnx cnx, String appId,
@@ -947,6 +949,10 @@ public class Consumer {
             subscription.addUnAckedMessages(ackedMessages);
             unackedMsgs = UNACKED_MESSAGES_UPDATER.addAndGet(consumer, 
ackedMessages);
         }
+        if (unackedMsgs < 0 && System.currentTimeMillis() - 
negtiveUnackedMsgsTimestamp >= 10_000) {
+            negtiveUnackedMsgsTimestamp = System.currentTimeMillis();
+            log.warn("unackedMsgs is : {}, ackedMessages : {}, consumer : {}", 
unackedMsgs, ackedMessages, consumer);
+        }
         return unackedMsgs;
     }
 

Reply via email to