This is an automated email from the ASF dual-hosted git repository.
aaronai pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/rocketmq-clients.git
The following commit(s) were added to refs/heads/master by this push:
new a4d63f2 Polish the part of ProcessQueue#dostats (#103)
a4d63f2 is described below
commit a4d63f29af80e7b1eb5c62abf5f8648f0723b110
Author: Aaron Ai <[email protected]>
AuthorDate: Sun Jul 31 12:22:19 2022 +0800
Polish the part of ProcessQueue#dostats (#103)
---
.../rocketmq/client/java/impl/consumer/ProcessQueue.java | 5 +++++
.../client/java/impl/consumer/ProcessQueueImpl.java | 16 ++++++++++++++++
.../client/java/impl/consumer/PushConsumerImpl.java | 12 +++---------
3 files changed, 24 insertions(+), 9 deletions(-)
diff --git
a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/ProcessQueue.java
b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/ProcessQueue.java
index dce7aee..19db00e 100644
---
a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/ProcessQueue.java
+++
b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/ProcessQueue.java
@@ -153,4 +153,9 @@ public interface ProcessQueue {
* @return bytes of cached message memory footprint.
*/
long getCachedMessageBytes();
+
+ /**
+ * Do some stats work.
+ */
+ void doStats();
}
diff --git
a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/ProcessQueueImpl.java
b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/ProcessQueueImpl.java
index d9a36a0..5cc55e6 100644
---
a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/ProcessQueueImpl.java
+++
b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/ProcessQueueImpl.java
@@ -102,6 +102,9 @@ class ProcessQueueImpl implements ProcessQueue {
private final AtomicLong cachedMessagesBytes;
+ private final AtomicLong receptionTimes;
+ private final AtomicLong receivedMessagesQuantity;
+
private volatile long activityNanoTime = System.nanoTime();
public ProcessQueueImpl(PushConsumerImpl consumer, MessageQueueImpl mq,
FilterExpression filterExpression) {
@@ -114,6 +117,8 @@ class ProcessQueueImpl implements ProcessQueue {
this.inflightMessages = new ArrayList<>();
this.inflightMessagesLock = new ReentrantReadWriteLock();
this.cachedMessagesBytes = new AtomicLong();
+ this.receptionTimes = new AtomicLong(0);
+ this.receivedMessagesQuantity = new AtomicLong(0);
}
@Override
@@ -273,6 +278,7 @@ class ProcessQueueImpl implements ProcessQueue {
onReceiveMessageException(t);
}
}, MoreExecutors.directExecutor());
+ receptionTimes.getAndIncrement();
consumer.getReceptionTimes().getAndIncrement();
} catch (Throwable t) {
LOGGER.error("Exception raised during message reception, mq={},
clientId={}", mq, consumer.clientId(), t);
@@ -328,6 +334,7 @@ class ProcessQueueImpl implements ProcessQueue {
final List<MessageViewImpl> messages = result.getMessageViewImpls();
if (!messages.isEmpty()) {
cacheMessages(messages);
+ receivedMessagesQuantity.getAndAdd(messages.size());
consumer.getReceivedMessagesQuantity().getAndAdd(messages.size());
consumer.getConsumeService().signal();
}
@@ -710,4 +717,13 @@ class ProcessQueueImpl implements ProcessQueue {
public long getCachedMessageBytes() {
return cachedMessagesBytes.get();
}
+
+ public void doStats() {
+ final long receptionTimes = this.receptionTimes.getAndSet(0);
+ final long receivedMessagesQuantity =
this.receivedMessagesQuantity.getAndSet(0);
+ LOGGER.info("Process queue stats: clientId={}, mq={},
receptionTimes={}, receivedMessageQuantity={}, "
+ + "pendingMessageCount={}, inflightMessageCount={},
cachedMessageBytes={}", consumer.clientId(), mq,
+ receptionTimes, receivedMessagesQuantity,
this.getPendingMessageCount(), this.getInflightMessageCount(),
+ this.getCachedMessageBytes());
+ }
}
diff --git
a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/PushConsumerImpl.java
b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/PushConsumerImpl.java
index 86bef80..e98cd55 100644
---
a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/PushConsumerImpl.java
+++
b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/PushConsumerImpl.java
@@ -587,15 +587,9 @@ class PushConsumerImpl extends ConsumerImpl implements
PushConsumer, MessageCach
final long consumptionErrorQuantity =
this.consumptionErrorQuantity.getAndSet(0);
LOGGER.info("clientId={}, consumerGroup={}, receptionTimes={},
receivedMessagesQuantity={}, "
- + "consumptionOkQuantity={}, consumptionErrorQuantity={}",
- clientId, consumerGroup, receptionTimes, receivedMessagesQuantity,
consumptionOkQuantity,
- consumptionErrorQuantity);
- for (ProcessQueue pq : processQueueTable.values()) {
- LOGGER.info("Process queue stats: clientId={}, mq={},
pendingMessageCount={}, inflightMessageCount={}, "
- + "cachedMessageBytes={}",
- clientId, pq.getMessageQueue(), pq.getPendingMessageCount(),
pq.getInflightMessageCount(),
- pq.getCachedMessageBytes());
- }
+ + "consumptionOkQuantity={}, consumptionErrorQuantity={}",
clientId, consumerGroup, receptionTimes,
+ receivedMessagesQuantity, consumptionOkQuantity,
consumptionErrorQuantity);
+ processQueueTable.values().forEach(ProcessQueue::doStats);
}
public RetryPolicy getRetryPolicy() {