This is an automated email from the ASF dual-hosted git repository.
daojun pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new c52af1d5c73 [improve][client] Print consumer stats log if prefetched
messages are not zero (#23698)
c52af1d5c73 is described below
commit c52af1d5c733ec05b40fc72e63a202f20d25603b
Author: Penghui Li <[email protected]>
AuthorDate: Thu Dec 12 21:32:20 2024 +0800
[improve][client] Print consumer stats log if prefetched messages are not
zero (#23698)
---
.../org/apache/pulsar/client/impl/ConsumerStatsRecorderImpl.java | 5 +++--
1 file changed, 3 insertions(+), 2 deletions(-)
diff --git
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerStatsRecorderImpl.java
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerStatsRecorderImpl.java
index 8dfc0af8e1d..5cbbcc44298 100644
---
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerStatsRecorderImpl.java
+++
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerStatsRecorderImpl.java
@@ -146,15 +146,16 @@ public class ConsumerStatsRecorderImpl implements
ConsumerStatsRecorder {
receivedMsgsRate = currentNumMsgsReceived / elapsed;
receivedBytesRate = currentNumBytesReceived / elapsed;
+ int prefetchQueueSize = consumerImpl.incomingMessages.size();
if ((currentNumMsgsReceived | currentNumBytesReceived |
currentNumReceiveFailed | currentNumAcksSent
- | currentNumAcksFailed) != 0) {
+ | currentNumAcksFailed | prefetchQueueSize) != 0) {
log.info(
"[{}] [{}] [{}] Prefetched messages: {} --- "
+ "Consume throughput received: {} msgs/s
--- {} Mbit/s --- "
+ "Ack sent rate: {} ack/s --- " + "Failed
messages: {} --- batch messages: {} ---"
+ "Failed acks: {}",
consumerImpl.getTopic(),
consumerImpl.getSubscription(), consumerImpl.consumerName,
- consumerImpl.incomingMessages.size(),
THROUGHPUT_FORMAT.format(receivedMsgsRate),
+ prefetchQueueSize,
THROUGHPUT_FORMAT.format(receivedMsgsRate),
THROUGHPUT_FORMAT.format(receivedBytesRate * 8 /
1024 / 1024),
THROUGHPUT_FORMAT.format(currentNumAcksSent /
elapsed), currentNumReceiveFailed,
currentNumBatchReceiveFailed,
currentNumAcksFailed);