This is an automated email from the ASF dual-hosted git repository.

yukon pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git

commit 17634d3317b857b1f784aa59af458da1d3f5206c
Author: YangJodie <[email protected]>
AuthorDate: Sun Aug 21 18:29:59 2022 +0800

    [ISSUE apache#4858] add consumer type check for message direct push
---
 .../client/impl/factory/MQClientInstance.java         |  2 +-
 .../common/protocol/body/ConsumerRunningInfo.java     | 19 ++++++++++---------
 .../tools/command/message/QueryMsgByIdSubCommand.java | 12 +++++++++---
 .../message/QueryMsgByUniqueKeySubCommand.java        | 13 ++++++++++---
 4 files changed, 30 insertions(+), 16 deletions(-)

diff --git 
a/client/src/main/java/org/apache/rocketmq/client/impl/factory/MQClientInstance.java
 
b/client/src/main/java/org/apache/rocketmq/client/impl/factory/MQClientInstance.java
index 56c7edf60..f87a7a84a 100644
--- 
a/client/src/main/java/org/apache/rocketmq/client/impl/factory/MQClientInstance.java
+++ 
b/client/src/main/java/org/apache/rocketmq/client/impl/factory/MQClientInstance.java
@@ -1269,7 +1269,7 @@ public class MQClientInstance {
         final String consumerGroup,
         final String brokerName) {
         MQConsumerInner mqConsumerInner = 
this.consumerTable.get(consumerGroup);
-        if (null != mqConsumerInner) {
+        if (null != mqConsumerInner && mqConsumerInner instanceof 
DefaultMQPushConsumerImpl) {
             DefaultMQPushConsumerImpl consumer = (DefaultMQPushConsumerImpl) 
mqConsumerInner;
 
             ConsumeMessageDirectlyResult result = 
consumer.getConsumeMessageService().consumeMessageDirectly(msg, brokerName);
diff --git 
a/common/src/main/java/org/apache/rocketmq/common/protocol/body/ConsumerRunningInfo.java
 
b/common/src/main/java/org/apache/rocketmq/common/protocol/body/ConsumerRunningInfo.java
index 6b0355fd8..4a68b25a5 100644
--- 
a/common/src/main/java/org/apache/rocketmq/common/protocol/body/ConsumerRunningInfo.java
+++ 
b/common/src/main/java/org/apache/rocketmq/common/protocol/body/ConsumerRunningInfo.java
@@ -52,15 +52,7 @@ public class ConsumerRunningInfo extends 
RemotingSerializable {
     public static boolean analyzeSubscription(final TreeMap<String/* clientId 
*/, ConsumerRunningInfo> criTable) {
         ConsumerRunningInfo prev = criTable.firstEntry().getValue();
 
-        boolean push = false;
-        {
-            String property = 
prev.getProperties().getProperty(ConsumerRunningInfo.PROP_CONSUME_TYPE);
-
-            if (property == null) {
-                property = ((ConsumeType) 
prev.getProperties().get(ConsumerRunningInfo.PROP_CONSUME_TYPE)).name();
-            }
-            push = ConsumeType.valueOf(property) == 
ConsumeType.CONSUME_PASSIVELY;
-        }
+        boolean push = isPushType(prev);
 
         boolean startForAWhile = false;
         {
@@ -103,6 +95,15 @@ public class ConsumerRunningInfo extends 
RemotingSerializable {
         return true;
     }
 
+    public static boolean isPushType(ConsumerRunningInfo consumerRunningInfo) {
+        String property = 
consumerRunningInfo.getProperties().getProperty(ConsumerRunningInfo.PROP_CONSUME_TYPE);
+
+        if (property == null) {
+            property = ((ConsumeType) 
consumerRunningInfo.getProperties().get(ConsumerRunningInfo.PROP_CONSUME_TYPE)).name();
+        }
+        return ConsumeType.valueOf(property) == ConsumeType.CONSUME_PASSIVELY;
+    }
+
     public static boolean analyzeRebalance(final TreeMap<String/* clientId */, 
ConsumerRunningInfo> criTable) {
         return true;
     }
diff --git 
a/tools/src/main/java/org/apache/rocketmq/tools/command/message/QueryMsgByIdSubCommand.java
 
b/tools/src/main/java/org/apache/rocketmq/tools/command/message/QueryMsgByIdSubCommand.java
index e118d00f7..ab06aae74 100644
--- 
a/tools/src/main/java/org/apache/rocketmq/tools/command/message/QueryMsgByIdSubCommand.java
+++ 
b/tools/src/main/java/org/apache/rocketmq/tools/command/message/QueryMsgByIdSubCommand.java
@@ -34,6 +34,7 @@ import org.apache.rocketmq.common.UtilAll;
 import org.apache.rocketmq.common.message.MessageClientExt;
 import org.apache.rocketmq.common.message.MessageExt;
 import org.apache.rocketmq.common.protocol.body.ConsumeMessageDirectlyResult;
+import org.apache.rocketmq.common.protocol.body.ConsumerRunningInfo;
 import org.apache.rocketmq.remoting.RPCHook;
 import org.apache.rocketmq.remoting.common.RemotingHelper;
 import org.apache.rocketmq.remoting.exception.RemotingException;
@@ -277,9 +278,14 @@ public class QueryMsgByIdSubCommand implements SubCommand {
     private void pushMsg(final DefaultMQAdminExt defaultMQAdminExt, final 
String consumerGroup, final String clientId,
         final String msgId) {
         try {
-            ConsumeMessageDirectlyResult result =
-                defaultMQAdminExt.consumeMessageDirectly(consumerGroup, 
clientId, msgId);
-            System.out.printf("%s", result);
+            ConsumerRunningInfo consumerRunningInfo = 
defaultMQAdminExt.getConsumerRunningInfo(consumerGroup, clientId, false, false);
+            if (ConsumerRunningInfo.isPushType(consumerRunningInfo)) {
+                ConsumeMessageDirectlyResult result =
+                        
defaultMQAdminExt.consumeMessageDirectly(consumerGroup, clientId, msgId);
+                System.out.printf("%s", result);
+            } else {
+                System.out.printf("this %s client is not push consumer ,not 
support direct push \n", clientId);
+            }
         } catch (Exception e) {
             e.printStackTrace();
         }
diff --git 
a/tools/src/main/java/org/apache/rocketmq/tools/command/message/QueryMsgByUniqueKeySubCommand.java
 
b/tools/src/main/java/org/apache/rocketmq/tools/command/message/QueryMsgByUniqueKeySubCommand.java
index 17abdf218..ebb8eb579 100644
--- 
a/tools/src/main/java/org/apache/rocketmq/tools/command/message/QueryMsgByUniqueKeySubCommand.java
+++ 
b/tools/src/main/java/org/apache/rocketmq/tools/command/message/QueryMsgByUniqueKeySubCommand.java
@@ -30,6 +30,7 @@ import org.apache.rocketmq.client.exception.MQClientException;
 import org.apache.rocketmq.common.UtilAll;
 import org.apache.rocketmq.common.message.MessageExt;
 import org.apache.rocketmq.common.protocol.body.ConsumeMessageDirectlyResult;
+import org.apache.rocketmq.common.protocol.body.ConsumerRunningInfo;
 import org.apache.rocketmq.remoting.RPCHook;
 import org.apache.rocketmq.remoting.common.RemotingHelper;
 import org.apache.rocketmq.remoting.exception.RemotingException;
@@ -180,9 +181,15 @@ public class QueryMsgByUniqueKeySubCommand implements 
SubCommand {
             if (commandLine.hasOption('g') && commandLine.hasOption('d')) {
                 final String consumerGroup = 
commandLine.getOptionValue('g').trim();
                 final String clientId = commandLine.getOptionValue('d').trim();
-                ConsumeMessageDirectlyResult result =
-                    defaultMQAdminExt.consumeMessageDirectly(consumerGroup, 
clientId, topic, msgId);
-                System.out.printf("%s", result);
+                ConsumerRunningInfo consumerRunningInfo = 
defaultMQAdminExt.getConsumerRunningInfo(consumerGroup, clientId, false, false);
+                if (ConsumerRunningInfo.isPushType(consumerRunningInfo)) {
+                    ConsumeMessageDirectlyResult result =
+                            
defaultMQAdminExt.consumeMessageDirectly(consumerGroup, clientId, topic, msgId);
+                    System.out.printf("%s", result);
+                } else {
+                    System.out.printf("this %s client is not push consumer 
,not support direct push \n", clientId);
+                }
+
             } else {
                 queryById(defaultMQAdminExt, topic, msgId, showAll);
             }

Reply via email to