This is an automated email from the ASF dual-hosted git repository.

zhoubo pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git


The following commit(s) were added to refs/heads/develop by this push:
     new 88000dac2 [ISSUE #6196] Update lastConsumeTimestamp and 
lastPullTimestamp in DefaultLitePullConsumer (#6197)
88000dac2 is described below

commit 88000dac212a0c721a99ca167132920ce0f45f1b
Author: rongtong <[email protected]>
AuthorDate: Tue Feb 28 13:58:21 2023 +0800

    [ISSUE #6196] Update lastConsumeTimestamp and lastPullTimestamp in 
DefaultLitePullConsumer (#6197)
    
    * Update lastConsumeTimestamp and lastPullTimestamp in 
DefaultLitePullConsumer
    
    * Add lastConsumeTimestamp and lastPullTimestamp in consumerRunningInfo for 
DefaultLitePullConsumer
    
    * Pass the check style
---
 .../client/impl/consumer/AssignedMessageQueue.java |  4 ---
 .../impl/consumer/DefaultLitePullConsumerImpl.java | 31 +++++++++++++++++-----
 2 files changed, 24 insertions(+), 11 deletions(-)

diff --git 
a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/AssignedMessageQueue.java
 
b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/AssignedMessageQueue.java
index 5f89c3c6c..a57cb53b4 100644
--- 
a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/AssignedMessageQueue.java
+++ 
b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/AssignedMessageQueue.java
@@ -37,10 +37,6 @@ public class AssignedMessageQueue {
         this.rebalanceImpl = rebalanceImpl;
     }
 
-    public Set<MessageQueue> messageQueues() {
-        return assignedMessageQueueState.keySet();
-    }
-
     public boolean isPaused(MessageQueue messageQueue) {
         MessageQueueState messageQueueState = 
assignedMessageQueueState.get(messageQueue);
         if (messageQueueState != null) {
diff --git 
a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultLitePullConsumerImpl.java
 
b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultLitePullConsumerImpl.java
index c6631cb5e..e5aed64d3 100644
--- 
a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultLitePullConsumerImpl.java
+++ 
b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultLitePullConsumerImpl.java
@@ -69,6 +69,7 @@ import 
org.apache.rocketmq.remoting.exception.RemotingException;
 import org.apache.rocketmq.remoting.protocol.NamespaceUtil;
 import org.apache.rocketmq.remoting.protocol.ResponseCode;
 import org.apache.rocketmq.remoting.protocol.body.ConsumerRunningInfo;
+import org.apache.rocketmq.remoting.protocol.body.ProcessQueueInfo;
 import org.apache.rocketmq.remoting.protocol.filter.FilterAPI;
 import org.apache.rocketmq.remoting.protocol.heartbeat.ConsumeType;
 import org.apache.rocketmq.remoting.protocol.heartbeat.MessageModel;
@@ -158,7 +159,8 @@ public class DefaultLitePullConsumerImpl implements 
MQConsumerInner {
     private final ArrayList<ConsumeMessageHook> consumeMessageHookList = new 
ArrayList<>();
 
     // only for test purpose, will be modified by reflection in unit test.
-    @SuppressWarnings("FieldMayBeFinal") private static boolean 
doNotUpdateTopicSubscribeInfoWhenSubscriptionChanged = false;
+    @SuppressWarnings("FieldMayBeFinal")
+    private static boolean 
doNotUpdateTopicSubscribeInfoWhenSubscriptionChanged = false;
 
     public DefaultLitePullConsumerImpl(final DefaultLitePullConsumer 
defaultLitePullConsumer, final RPCHook rpcHook) {
         this.defaultLitePullConsumer = defaultLitePullConsumer;
@@ -394,7 +396,7 @@ public class DefaultLitePullConsumerImpl implements 
MQConsumerInner {
         }
         // If assign function invoke before start function, then update pull 
task after initialization.
         if (subscriptionType == SubscriptionType.ASSIGN) {
-            updateAssignPullTask(assignedMessageQueue.messageQueues());
+            
updateAssignPullTask(assignedMessageQueue.getAssignedMessageQueues());
         }
 
         for (String topic : topicMessageQueueChangeListenerMap.keySet()) {
@@ -484,12 +486,14 @@ public class DefaultLitePullConsumerImpl implements 
MQConsumerInner {
 
     /**
      * subscribe data by customizing messageQueueListener
+     *
      * @param topic
      * @param subExpression
      * @param messageQueueListener
      * @throws MQClientException
      */
-    public synchronized void subscribe(String topic, String subExpression, 
MessageQueueListener messageQueueListener) throws MQClientException {
+    public synchronized void subscribe(String topic, String subExpression,
+        MessageQueueListener messageQueueListener) throws MQClientException {
         try {
             if (StringUtils.isEmpty(topic)) {
                 throw new IllegalArgumentException("Topic can not be null or 
empty.");
@@ -516,7 +520,6 @@ public class DefaultLitePullConsumerImpl implements 
MQConsumerInner {
         }
     }
 
-
     public synchronized void subscribe(String topic, String subExpression) 
throws MQClientException {
         try {
             if (topic == null || "".equals(topic)) {
@@ -637,6 +640,7 @@ public class DefaultLitePullConsumerImpl implements 
MQConsumerInner {
                     consumeMessageContext.setSuccess(true);
                     this.executeHookAfter(consumeMessageContext);
                 }
+                
consumeRequest.getProcessQueue().setLastConsumeTimestamp(System.currentTimeMillis());
                 return messages;
             }
         } catch (InterruptedException ignore) {
@@ -655,7 +659,7 @@ public class DefaultLitePullConsumerImpl implements 
MQConsumerInner {
     }
 
     public synchronized void seek(MessageQueue messageQueue, long offset) 
throws MQClientException {
-        if (!assignedMessageQueue.messageQueues().contains(messageQueue)) {
+        if 
(!assignedMessageQueue.getAssignedMessageQueues().contains(messageQueue)) {
             if (subscriptionType == SubscriptionType.SUBSCRIBE) {
                 throw new MQClientException("The message queue is not in 
assigned list, may be rebalancing, message queue: " + messageQueue, null);
             } else {
@@ -721,7 +725,7 @@ public class DefaultLitePullConsumerImpl implements 
MQConsumerInner {
     }
 
     public synchronized void commitAll() {
-        for (MessageQueue messageQueue : assignedMessageQueue.messageQueues()) 
{
+        for (MessageQueue messageQueue : 
assignedMessageQueue.getAssignedMessageQueues()) {
             try {
                 commit(messageQueue);
             } catch (Exception e) {
@@ -732,6 +736,7 @@ public class DefaultLitePullConsumerImpl implements 
MQConsumerInner {
 
     /**
      * Specify offset commit
+     *
      * @param messageQueues
      * @param persist
      */
@@ -760,6 +765,7 @@ public class DefaultLitePullConsumerImpl implements 
MQConsumerInner {
 
     /**
      * Get the queue assigned in subscribe mode
+     *
      * @return
      */
     public synchronized Set<MessageQueue> assignment() {
@@ -895,6 +901,8 @@ public class DefaultLitePullConsumerImpl implements 
MQConsumerInner {
                     return;
                 }
 
+                processQueue.setLastPullTimestamp(System.currentTimeMillis());
+
                 if ((long) consumeRequestCache.size() * 
defaultLitePullConsumer.getPullBatchSize() > 
defaultLitePullConsumer.getPullThresholdForAll()) {
                     scheduledThreadPoolExecutor.schedule(this, 
PULL_TIME_DELAY_MILLS_WHEN_CACHE_FLOW_CONTROL, TimeUnit.MILLISECONDS);
                     if ((consumeRequestFlowControlTimes++ % 1000) == 0) {
@@ -1172,6 +1180,15 @@ public class DefaultLitePullConsumerImpl implements 
MQConsumerInner {
         info.setProperties(prop);
 
         info.getSubscriptionSet().addAll(this.subscriptions());
+
+        for (MessageQueue mq : 
this.assignedMessageQueue.getAssignedMessageQueues()) {
+            ProcessQueue pq = this.assignedMessageQueue.getProcessQueue(mq);
+            ProcessQueueInfo pqInfo = new ProcessQueueInfo();
+            pqInfo.setCommitOffset(this.offsetStore.readOffset(mq, 
ReadOffsetType.MEMORY_FIRST_THEN_STORE));
+            pq.fillProcessQueueInfo(pqInfo);
+            info.getMqTable().put(mq, pqInfo);
+        }
+
         return info;
     }
 
@@ -1234,7 +1251,7 @@ public class DefaultLitePullConsumerImpl implements 
MQConsumerInner {
     }
 
     public synchronized void registerTopicMessageQueueChangeListener(String 
topic,
-                                                                     
TopicMessageQueueChangeListener listener) throws MQClientException {
+        TopicMessageQueueChangeListener listener) throws MQClientException {
         if (topic == null || listener == null) {
             throw new MQClientException("Topic or listener is null", null);
         }

Reply via email to