This is an automated email from the ASF dual-hosted git repository.
zhoubo pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git
The following commit(s) were added to refs/heads/develop by this push:
new 88000dac2 [ISSUE #6196] Update lastConsumeTimestamp and
lastPullTimestamp in DefaultLitePullConsumer (#6197)
88000dac2 is described below
commit 88000dac212a0c721a99ca167132920ce0f45f1b
Author: rongtong <[email protected]>
AuthorDate: Tue Feb 28 13:58:21 2023 +0800
[ISSUE #6196] Update lastConsumeTimestamp and lastPullTimestamp in
DefaultLitePullConsumer (#6197)
* Update lastConsumeTimestamp and lastPullTimestamp in
DefaultLitePullConsumer
* Add lastConsumeTimestamp and lastPullTimestamp in consumerRunningInfo for
DefaultLitePullConsumer
* Pass the check style
---
.../client/impl/consumer/AssignedMessageQueue.java | 4 ---
.../impl/consumer/DefaultLitePullConsumerImpl.java | 31 +++++++++++++++++-----
2 files changed, 24 insertions(+), 11 deletions(-)
diff --git
a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/AssignedMessageQueue.java
b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/AssignedMessageQueue.java
index 5f89c3c6c..a57cb53b4 100644
---
a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/AssignedMessageQueue.java
+++
b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/AssignedMessageQueue.java
@@ -37,10 +37,6 @@ public class AssignedMessageQueue {
this.rebalanceImpl = rebalanceImpl;
}
- public Set<MessageQueue> messageQueues() {
- return assignedMessageQueueState.keySet();
- }
-
public boolean isPaused(MessageQueue messageQueue) {
MessageQueueState messageQueueState =
assignedMessageQueueState.get(messageQueue);
if (messageQueueState != null) {
diff --git
a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultLitePullConsumerImpl.java
b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultLitePullConsumerImpl.java
index c6631cb5e..e5aed64d3 100644
---
a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultLitePullConsumerImpl.java
+++
b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultLitePullConsumerImpl.java
@@ -69,6 +69,7 @@ import
org.apache.rocketmq.remoting.exception.RemotingException;
import org.apache.rocketmq.remoting.protocol.NamespaceUtil;
import org.apache.rocketmq.remoting.protocol.ResponseCode;
import org.apache.rocketmq.remoting.protocol.body.ConsumerRunningInfo;
+import org.apache.rocketmq.remoting.protocol.body.ProcessQueueInfo;
import org.apache.rocketmq.remoting.protocol.filter.FilterAPI;
import org.apache.rocketmq.remoting.protocol.heartbeat.ConsumeType;
import org.apache.rocketmq.remoting.protocol.heartbeat.MessageModel;
@@ -158,7 +159,8 @@ public class DefaultLitePullConsumerImpl implements
MQConsumerInner {
private final ArrayList<ConsumeMessageHook> consumeMessageHookList = new
ArrayList<>();
// only for test purpose, will be modified by reflection in unit test.
- @SuppressWarnings("FieldMayBeFinal") private static boolean
doNotUpdateTopicSubscribeInfoWhenSubscriptionChanged = false;
+ @SuppressWarnings("FieldMayBeFinal")
+ private static boolean
doNotUpdateTopicSubscribeInfoWhenSubscriptionChanged = false;
public DefaultLitePullConsumerImpl(final DefaultLitePullConsumer
defaultLitePullConsumer, final RPCHook rpcHook) {
this.defaultLitePullConsumer = defaultLitePullConsumer;
@@ -394,7 +396,7 @@ public class DefaultLitePullConsumerImpl implements
MQConsumerInner {
}
// If assign function invoke before start function, then update pull
task after initialization.
if (subscriptionType == SubscriptionType.ASSIGN) {
- updateAssignPullTask(assignedMessageQueue.messageQueues());
+
updateAssignPullTask(assignedMessageQueue.getAssignedMessageQueues());
}
for (String topic : topicMessageQueueChangeListenerMap.keySet()) {
@@ -484,12 +486,14 @@ public class DefaultLitePullConsumerImpl implements
MQConsumerInner {
/**
* subscribe data by customizing messageQueueListener
+ *
* @param topic
* @param subExpression
* @param messageQueueListener
* @throws MQClientException
*/
- public synchronized void subscribe(String topic, String subExpression,
MessageQueueListener messageQueueListener) throws MQClientException {
+ public synchronized void subscribe(String topic, String subExpression,
+ MessageQueueListener messageQueueListener) throws MQClientException {
try {
if (StringUtils.isEmpty(topic)) {
throw new IllegalArgumentException("Topic can not be null or
empty.");
@@ -516,7 +520,6 @@ public class DefaultLitePullConsumerImpl implements
MQConsumerInner {
}
}
-
public synchronized void subscribe(String topic, String subExpression)
throws MQClientException {
try {
if (topic == null || "".equals(topic)) {
@@ -637,6 +640,7 @@ public class DefaultLitePullConsumerImpl implements
MQConsumerInner {
consumeMessageContext.setSuccess(true);
this.executeHookAfter(consumeMessageContext);
}
+
consumeRequest.getProcessQueue().setLastConsumeTimestamp(System.currentTimeMillis());
return messages;
}
} catch (InterruptedException ignore) {
@@ -655,7 +659,7 @@ public class DefaultLitePullConsumerImpl implements
MQConsumerInner {
}
public synchronized void seek(MessageQueue messageQueue, long offset)
throws MQClientException {
- if (!assignedMessageQueue.messageQueues().contains(messageQueue)) {
+ if
(!assignedMessageQueue.getAssignedMessageQueues().contains(messageQueue)) {
if (subscriptionType == SubscriptionType.SUBSCRIBE) {
throw new MQClientException("The message queue is not in
assigned list, may be rebalancing, message queue: " + messageQueue, null);
} else {
@@ -721,7 +725,7 @@ public class DefaultLitePullConsumerImpl implements
MQConsumerInner {
}
public synchronized void commitAll() {
- for (MessageQueue messageQueue : assignedMessageQueue.messageQueues())
{
+ for (MessageQueue messageQueue :
assignedMessageQueue.getAssignedMessageQueues()) {
try {
commit(messageQueue);
} catch (Exception e) {
@@ -732,6 +736,7 @@ public class DefaultLitePullConsumerImpl implements
MQConsumerInner {
/**
* Specify offset commit
+ *
* @param messageQueues
* @param persist
*/
@@ -760,6 +765,7 @@ public class DefaultLitePullConsumerImpl implements
MQConsumerInner {
/**
* Get the queue assigned in subscribe mode
+ *
* @return
*/
public synchronized Set<MessageQueue> assignment() {
@@ -895,6 +901,8 @@ public class DefaultLitePullConsumerImpl implements
MQConsumerInner {
return;
}
+ processQueue.setLastPullTimestamp(System.currentTimeMillis());
+
if ((long) consumeRequestCache.size() *
defaultLitePullConsumer.getPullBatchSize() >
defaultLitePullConsumer.getPullThresholdForAll()) {
scheduledThreadPoolExecutor.schedule(this,
PULL_TIME_DELAY_MILLS_WHEN_CACHE_FLOW_CONTROL, TimeUnit.MILLISECONDS);
if ((consumeRequestFlowControlTimes++ % 1000) == 0) {
@@ -1172,6 +1180,15 @@ public class DefaultLitePullConsumerImpl implements
MQConsumerInner {
info.setProperties(prop);
info.getSubscriptionSet().addAll(this.subscriptions());
+
+ for (MessageQueue mq :
this.assignedMessageQueue.getAssignedMessageQueues()) {
+ ProcessQueue pq = this.assignedMessageQueue.getProcessQueue(mq);
+ ProcessQueueInfo pqInfo = new ProcessQueueInfo();
+ pqInfo.setCommitOffset(this.offsetStore.readOffset(mq,
ReadOffsetType.MEMORY_FIRST_THEN_STORE));
+ pq.fillProcessQueueInfo(pqInfo);
+ info.getMqTable().put(mq, pqInfo);
+ }
+
return info;
}
@@ -1234,7 +1251,7 @@ public class DefaultLitePullConsumerImpl implements
MQConsumerInner {
}
public synchronized void registerTopicMessageQueueChangeListener(String
topic,
-
TopicMessageQueueChangeListener listener) throws MQClientException {
+ TopicMessageQueueChangeListener listener) throws MQClientException {
if (topic == null || listener == null) {
throw new MQClientException("Topic or listener is null", null);
}