This is an automated email from the ASF dual-hosted git repository.
jinrongtong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/rocketmq-dashboard.git
The following commit(s) were added to refs/heads/master by this push:
new 3d13e4e fix:Failed to find messages older than 3 days using message
ID #274 (#275)
3d13e4e is described below
commit 3d13e4e2b86e85353360bc938920c014b84cb83d
Author: Xu Yichi <[email protected]>
AuthorDate: Mon Mar 31 10:45:24 2025 +0800
fix:Failed to find messages older than 3 days using message ID #274 (#275)
---
.../dashboard/service/client/MQAdminExtImpl.java | 58 ++++++++++++----------
1 file changed, 32 insertions(+), 26 deletions(-)
diff --git
a/src/main/java/org/apache/rocketmq/dashboard/service/client/MQAdminExtImpl.java
b/src/main/java/org/apache/rocketmq/dashboard/service/client/MQAdminExtImpl.java
index 6f5bc25..83143c3 100644
---
a/src/main/java/org/apache/rocketmq/dashboard/service/client/MQAdminExtImpl.java
+++
b/src/main/java/org/apache/rocketmq/dashboard/service/client/MQAdminExtImpl.java
@@ -17,29 +17,27 @@
package org.apache.rocketmq.dashboard.service.client;
import com.google.common.base.Throwables;
+import java.io.UnsupportedEncodingException;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.Set;
import org.apache.rocketmq.client.QueryResult;
import org.apache.rocketmq.client.exception.MQBrokerException;
import org.apache.rocketmq.client.exception.MQClientException;
+import org.apache.rocketmq.client.impl.MQAdminImpl;
import org.apache.rocketmq.common.AclConfig;
import org.apache.rocketmq.common.PlainAccessConfig;
import org.apache.rocketmq.common.TopicConfig;
+import org.apache.rocketmq.remoting.protocol.admin.ConsumeStats;
+import org.apache.rocketmq.remoting.protocol.admin.RollbackStats;
+import org.apache.rocketmq.remoting.protocol.admin.TopicStatsTable;
import org.apache.rocketmq.common.message.MessageClientIDSetter;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.common.message.MessageQueue;
import org.apache.rocketmq.common.message.MessageRequestMode;
-import org.apache.rocketmq.dashboard.util.JsonUtil;
-import org.apache.rocketmq.remoting.RemotingClient;
-import org.apache.rocketmq.remoting.exception.RemotingCommandException;
-import org.apache.rocketmq.remoting.exception.RemotingConnectException;
-import org.apache.rocketmq.remoting.exception.RemotingException;
-import org.apache.rocketmq.remoting.exception.RemotingSendRequestException;
-import org.apache.rocketmq.remoting.exception.RemotingTimeoutException;
-import org.apache.rocketmq.remoting.protocol.RemotingCommand;
import org.apache.rocketmq.remoting.protocol.RequestCode;
import org.apache.rocketmq.remoting.protocol.ResponseCode;
-import org.apache.rocketmq.remoting.protocol.admin.ConsumeStats;
-import org.apache.rocketmq.remoting.protocol.admin.RollbackStats;
-import org.apache.rocketmq.remoting.protocol.admin.TopicStatsTable;
import org.apache.rocketmq.remoting.protocol.body.BrokerReplicasInfo;
import org.apache.rocketmq.remoting.protocol.body.BrokerStatsData;
import org.apache.rocketmq.remoting.protocol.body.ClusterAclVersionInfo;
@@ -66,20 +64,23 @@ import
org.apache.rocketmq.remoting.protocol.route.TopicRouteData;
import
org.apache.rocketmq.remoting.protocol.statictopic.TopicQueueMappingDetail;
import org.apache.rocketmq.remoting.protocol.subscription.GroupForbidden;
import
org.apache.rocketmq.remoting.protocol.subscription.SubscriptionGroupConfig;
+import org.apache.rocketmq.dashboard.util.JsonUtil;
+import org.apache.rocketmq.remoting.RemotingClient;
+import org.apache.rocketmq.remoting.exception.RemotingCommandException;
+import org.apache.rocketmq.remoting.exception.RemotingConnectException;
+import org.apache.rocketmq.remoting.exception.RemotingException;
+import org.apache.rocketmq.remoting.exception.RemotingSendRequestException;
+import org.apache.rocketmq.remoting.exception.RemotingTimeoutException;
+import org.apache.rocketmq.remoting.protocol.RemotingCommand;
import org.apache.rocketmq.tools.admin.MQAdminExt;
import org.apache.rocketmq.tools.admin.api.BrokerOperatorResult;
import org.apache.rocketmq.tools.admin.api.MessageTrack;
import org.apache.rocketmq.tools.admin.common.AdminToolResult;
+import org.joor.Reflect;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Service;
-import java.io.UnsupportedEncodingException;
-import java.util.List;
-import java.util.Map;
-import java.util.Properties;
-import java.util.Set;
-
import static
org.apache.rocketmq.remoting.protocol.RemotingSerializable.decode;
@Service
@@ -461,18 +462,23 @@ public class MQAdminExtImpl implements MQAdminExt {
logger.info("MessageClientIDSetter.getNearlyTimeFromID(msgId)={}
msgId={}", MessageClientIDSetter.getNearlyTimeFromID(msgId), msgId);
try {
return viewMessage(msgId);
+ } catch (Exception e) {
}
- catch (Exception e) {
- }
-
+ MQAdminImpl mqAdminImpl =
MQAdminInstance.threadLocalMqClientInstance().getMQAdminImpl();
Set<String> clusterList =
MQAdminInstance.threadLocalMQAdminExt().getTopicClusterList(topic);
if (clusterList == null || clusterList.isEmpty()) {
- return MQAdminInstance.threadLocalMQAdminExt().queryMessage("",
topic, msgId);
- }
- for (String name : clusterList) {
- MessageExt messageExt =
MQAdminInstance.threadLocalMQAdminExt().queryMessage(name, topic, msgId);
- if (messageExt != null) {
- return messageExt;
+ QueryResult qr = Reflect.on(mqAdminImpl).call("queryMessage", "",
topic, msgId, 32,
+
MessageClientIDSetter.getNearlyTimeFromID(msgId).getTime(), Long.MAX_VALUE,
true).get();
+ if (qr != null && qr.getMessageList() != null &&
!qr.getMessageList().isEmpty()) {
+ return qr.getMessageList().get(0);
+ }
+ } else {
+ for (String name : clusterList) {
+ QueryResult qr = Reflect.on(mqAdminImpl).call("queryMessage",
name, topic, msgId, 32,
+
MessageClientIDSetter.getNearlyTimeFromID(msgId).getTime(), Long.MAX_VALUE,
true).get();
+ if (qr != null && qr.getMessageList() != null &&
!qr.getMessageList().isEmpty()) {
+ return qr.getMessageList().get(0);
+ }
}
}
return null;