This is an automated email from the ASF dual-hosted git repository.

aaronai pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/rocketmq-clients.git


The following commit(s) were added to refs/heads/master by this push:
     new a4d63f2  Polish the part of ProcessQueue#dostats (#103)
a4d63f2 is described below

commit a4d63f29af80e7b1eb5c62abf5f8648f0723b110
Author: Aaron Ai <[email protected]>
AuthorDate: Sun Jul 31 12:22:19 2022 +0800

    Polish the part of ProcessQueue#dostats (#103)
---
 .../rocketmq/client/java/impl/consumer/ProcessQueue.java |  5 +++++
 .../client/java/impl/consumer/ProcessQueueImpl.java      | 16 ++++++++++++++++
 .../client/java/impl/consumer/PushConsumerImpl.java      | 12 +++---------
 3 files changed, 24 insertions(+), 9 deletions(-)

diff --git 
a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/ProcessQueue.java
 
b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/ProcessQueue.java
index dce7aee..19db00e 100644
--- 
a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/ProcessQueue.java
+++ 
b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/ProcessQueue.java
@@ -153,4 +153,9 @@ public interface ProcessQueue {
      * @return bytes of cached message memory footprint.
      */
     long getCachedMessageBytes();
+
+    /**
+     * Do some stats work.
+     */
+    void doStats();
 }
diff --git 
a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/ProcessQueueImpl.java
 
b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/ProcessQueueImpl.java
index d9a36a0..5cc55e6 100644
--- 
a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/ProcessQueueImpl.java
+++ 
b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/ProcessQueueImpl.java
@@ -102,6 +102,9 @@ class ProcessQueueImpl implements ProcessQueue {
 
     private final AtomicLong cachedMessagesBytes;
 
+    private final AtomicLong receptionTimes;
+    private final AtomicLong receivedMessagesQuantity;
+
     private volatile long activityNanoTime = System.nanoTime();
 
     public ProcessQueueImpl(PushConsumerImpl consumer, MessageQueueImpl mq, 
FilterExpression filterExpression) {
@@ -114,6 +117,8 @@ class ProcessQueueImpl implements ProcessQueue {
         this.inflightMessages = new ArrayList<>();
         this.inflightMessagesLock = new ReentrantReadWriteLock();
         this.cachedMessagesBytes = new AtomicLong();
+        this.receptionTimes = new AtomicLong(0);
+        this.receivedMessagesQuantity = new AtomicLong(0);
     }
 
     @Override
@@ -273,6 +278,7 @@ class ProcessQueueImpl implements ProcessQueue {
                     onReceiveMessageException(t);
                 }
             }, MoreExecutors.directExecutor());
+            receptionTimes.getAndIncrement();
             consumer.getReceptionTimes().getAndIncrement();
         } catch (Throwable t) {
             LOGGER.error("Exception raised during message reception, mq={}, 
clientId={}", mq, consumer.clientId(), t);
@@ -328,6 +334,7 @@ class ProcessQueueImpl implements ProcessQueue {
         final List<MessageViewImpl> messages = result.getMessageViewImpls();
         if (!messages.isEmpty()) {
             cacheMessages(messages);
+            receivedMessagesQuantity.getAndAdd(messages.size());
             consumer.getReceivedMessagesQuantity().getAndAdd(messages.size());
             consumer.getConsumeService().signal();
         }
@@ -710,4 +717,13 @@ class ProcessQueueImpl implements ProcessQueue {
     public long getCachedMessageBytes() {
         return cachedMessagesBytes.get();
     }
+
+    public void doStats() {
+        final long receptionTimes = this.receptionTimes.getAndSet(0);
+        final long receivedMessagesQuantity = 
this.receivedMessagesQuantity.getAndSet(0);
+        LOGGER.info("Process queue stats: clientId={}, mq={}, 
receptionTimes={}, receivedMessageQuantity={}, "
+                + "pendingMessageCount={}, inflightMessageCount={}, 
cachedMessageBytes={}", consumer.clientId(), mq,
+            receptionTimes, receivedMessagesQuantity, 
this.getPendingMessageCount(), this.getInflightMessageCount(),
+            this.getCachedMessageBytes());
+    }
 }
diff --git 
a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/PushConsumerImpl.java
 
b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/PushConsumerImpl.java
index 86bef80..e98cd55 100644
--- 
a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/PushConsumerImpl.java
+++ 
b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/PushConsumerImpl.java
@@ -587,15 +587,9 @@ class PushConsumerImpl extends ConsumerImpl implements 
PushConsumer, MessageCach
         final long consumptionErrorQuantity = 
this.consumptionErrorQuantity.getAndSet(0);
 
         LOGGER.info("clientId={}, consumerGroup={}, receptionTimes={}, 
receivedMessagesQuantity={}, "
-                + "consumptionOkQuantity={}, consumptionErrorQuantity={}",
-            clientId, consumerGroup, receptionTimes, receivedMessagesQuantity, 
consumptionOkQuantity,
-            consumptionErrorQuantity);
-        for (ProcessQueue pq : processQueueTable.values()) {
-            LOGGER.info("Process queue stats: clientId={}, mq={}, 
pendingMessageCount={}, inflightMessageCount={}, "
-                    + "cachedMessageBytes={}",
-                clientId, pq.getMessageQueue(), pq.getPendingMessageCount(), 
pq.getInflightMessageCount(),
-                pq.getCachedMessageBytes());
-        }
+                + "consumptionOkQuantity={}, consumptionErrorQuantity={}", 
clientId, consumerGroup, receptionTimes,
+            receivedMessagesQuantity, consumptionOkQuantity, 
consumptionErrorQuantity);
+        processQueueTable.values().forEach(ProcessQueue::doStats);
     }
 
     public RetryPolicy getRetryPolicy() {

Reply via email to