This is an automated email from the ASF dual-hosted git repository. penghui pushed a commit to branch branch-2.10 in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit fd5bc9d688fa1c9f3f918626b3e12f896de511be Author: Jiwei Guo <[email protected]> AuthorDate: Thu Mar 10 21:46:35 2022 +0800 Add log to track negtive unacked msg. (#14501) (cherry picked from commit 82bbb2c03a4ae14c2bccab183d665ad3248d4184) --- .../src/main/java/org/apache/pulsar/broker/service/Consumer.java | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java index 3cf4cac..c30b991 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java @@ -138,6 +138,8 @@ public class Consumer { @Setter private volatile long consumerEpoch; + private long negtiveUnackedMsgsTimestamp; + public Consumer(Subscription subscription, SubType subType, String topicName, long consumerId, int priorityLevel, String consumerName, boolean isDurable, TransportCnx cnx, String appId, @@ -947,6 +949,10 @@ public class Consumer { subscription.addUnAckedMessages(ackedMessages); unackedMsgs = UNACKED_MESSAGES_UPDATER.addAndGet(consumer, ackedMessages); } + if (unackedMsgs < 0 && System.currentTimeMillis() - negtiveUnackedMsgsTimestamp >= 10_000) { + negtiveUnackedMsgsTimestamp = System.currentTimeMillis(); + log.warn("unackedMsgs is : {}, ackedMessages : {}, consumer : {}", unackedMsgs, ackedMessages, consumer); + } return unackedMsgs; }
