This is an automated email from the ASF dual-hosted git repository. yukon pushed a commit to branch develop in repository https://gitbox.apache.org/repos/asf/rocketmq.git
commit 17634d3317b857b1f784aa59af458da1d3f5206c Author: YangJodie <[email protected]> AuthorDate: Sun Aug 21 18:29:59 2022 +0800 [ISSUE apache#4858] add consumer type check for message direct push --- .../client/impl/factory/MQClientInstance.java | 2 +- .../common/protocol/body/ConsumerRunningInfo.java | 19 ++++++++++--------- .../tools/command/message/QueryMsgByIdSubCommand.java | 12 +++++++++--- .../message/QueryMsgByUniqueKeySubCommand.java | 13 ++++++++++--- 4 files changed, 30 insertions(+), 16 deletions(-) diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/factory/MQClientInstance.java b/client/src/main/java/org/apache/rocketmq/client/impl/factory/MQClientInstance.java index 56c7edf60..f87a7a84a 100644 --- a/client/src/main/java/org/apache/rocketmq/client/impl/factory/MQClientInstance.java +++ b/client/src/main/java/org/apache/rocketmq/client/impl/factory/MQClientInstance.java @@ -1269,7 +1269,7 @@ public class MQClientInstance { final String consumerGroup, final String brokerName) { MQConsumerInner mqConsumerInner = this.consumerTable.get(consumerGroup); - if (null != mqConsumerInner) { + if (null != mqConsumerInner && mqConsumerInner instanceof DefaultMQPushConsumerImpl) { DefaultMQPushConsumerImpl consumer = (DefaultMQPushConsumerImpl) mqConsumerInner; ConsumeMessageDirectlyResult result = consumer.getConsumeMessageService().consumeMessageDirectly(msg, brokerName); diff --git a/common/src/main/java/org/apache/rocketmq/common/protocol/body/ConsumerRunningInfo.java b/common/src/main/java/org/apache/rocketmq/common/protocol/body/ConsumerRunningInfo.java index 6b0355fd8..4a68b25a5 100644 --- a/common/src/main/java/org/apache/rocketmq/common/protocol/body/ConsumerRunningInfo.java +++ b/common/src/main/java/org/apache/rocketmq/common/protocol/body/ConsumerRunningInfo.java @@ -52,15 +52,7 @@ public class ConsumerRunningInfo extends RemotingSerializable { public static boolean analyzeSubscription(final TreeMap<String/* clientId */, ConsumerRunningInfo> criTable) { ConsumerRunningInfo prev = criTable.firstEntry().getValue(); - boolean push = false; - { - String property = prev.getProperties().getProperty(ConsumerRunningInfo.PROP_CONSUME_TYPE); - - if (property == null) { - property = ((ConsumeType) prev.getProperties().get(ConsumerRunningInfo.PROP_CONSUME_TYPE)).name(); - } - push = ConsumeType.valueOf(property) == ConsumeType.CONSUME_PASSIVELY; - } + boolean push = isPushType(prev); boolean startForAWhile = false; { @@ -103,6 +95,15 @@ public class ConsumerRunningInfo extends RemotingSerializable { return true; } + public static boolean isPushType(ConsumerRunningInfo consumerRunningInfo) { + String property = consumerRunningInfo.getProperties().getProperty(ConsumerRunningInfo.PROP_CONSUME_TYPE); + + if (property == null) { + property = ((ConsumeType) consumerRunningInfo.getProperties().get(ConsumerRunningInfo.PROP_CONSUME_TYPE)).name(); + } + return ConsumeType.valueOf(property) == ConsumeType.CONSUME_PASSIVELY; + } + public static boolean analyzeRebalance(final TreeMap<String/* clientId */, ConsumerRunningInfo> criTable) { return true; } diff --git a/tools/src/main/java/org/apache/rocketmq/tools/command/message/QueryMsgByIdSubCommand.java b/tools/src/main/java/org/apache/rocketmq/tools/command/message/QueryMsgByIdSubCommand.java index e118d00f7..ab06aae74 100644 --- a/tools/src/main/java/org/apache/rocketmq/tools/command/message/QueryMsgByIdSubCommand.java +++ b/tools/src/main/java/org/apache/rocketmq/tools/command/message/QueryMsgByIdSubCommand.java @@ -34,6 +34,7 @@ import org.apache.rocketmq.common.UtilAll; import org.apache.rocketmq.common.message.MessageClientExt; import org.apache.rocketmq.common.message.MessageExt; import org.apache.rocketmq.common.protocol.body.ConsumeMessageDirectlyResult; +import org.apache.rocketmq.common.protocol.body.ConsumerRunningInfo; import org.apache.rocketmq.remoting.RPCHook; import org.apache.rocketmq.remoting.common.RemotingHelper; import org.apache.rocketmq.remoting.exception.RemotingException; @@ -277,9 +278,14 @@ public class QueryMsgByIdSubCommand implements SubCommand { private void pushMsg(final DefaultMQAdminExt defaultMQAdminExt, final String consumerGroup, final String clientId, final String msgId) { try { - ConsumeMessageDirectlyResult result = - defaultMQAdminExt.consumeMessageDirectly(consumerGroup, clientId, msgId); - System.out.printf("%s", result); + ConsumerRunningInfo consumerRunningInfo = defaultMQAdminExt.getConsumerRunningInfo(consumerGroup, clientId, false, false); + if (ConsumerRunningInfo.isPushType(consumerRunningInfo)) { + ConsumeMessageDirectlyResult result = + defaultMQAdminExt.consumeMessageDirectly(consumerGroup, clientId, msgId); + System.out.printf("%s", result); + } else { + System.out.printf("this %s client is not push consumer ,not support direct push \n", clientId); + } } catch (Exception e) { e.printStackTrace(); } diff --git a/tools/src/main/java/org/apache/rocketmq/tools/command/message/QueryMsgByUniqueKeySubCommand.java b/tools/src/main/java/org/apache/rocketmq/tools/command/message/QueryMsgByUniqueKeySubCommand.java index 17abdf218..ebb8eb579 100644 --- a/tools/src/main/java/org/apache/rocketmq/tools/command/message/QueryMsgByUniqueKeySubCommand.java +++ b/tools/src/main/java/org/apache/rocketmq/tools/command/message/QueryMsgByUniqueKeySubCommand.java @@ -30,6 +30,7 @@ import org.apache.rocketmq.client.exception.MQClientException; import org.apache.rocketmq.common.UtilAll; import org.apache.rocketmq.common.message.MessageExt; import org.apache.rocketmq.common.protocol.body.ConsumeMessageDirectlyResult; +import org.apache.rocketmq.common.protocol.body.ConsumerRunningInfo; import org.apache.rocketmq.remoting.RPCHook; import org.apache.rocketmq.remoting.common.RemotingHelper; import org.apache.rocketmq.remoting.exception.RemotingException; @@ -180,9 +181,15 @@ public class QueryMsgByUniqueKeySubCommand implements SubCommand { if (commandLine.hasOption('g') && commandLine.hasOption('d')) { final String consumerGroup = commandLine.getOptionValue('g').trim(); final String clientId = commandLine.getOptionValue('d').trim(); - ConsumeMessageDirectlyResult result = - defaultMQAdminExt.consumeMessageDirectly(consumerGroup, clientId, topic, msgId); - System.out.printf("%s", result); + ConsumerRunningInfo consumerRunningInfo = defaultMQAdminExt.getConsumerRunningInfo(consumerGroup, clientId, false, false); + if (ConsumerRunningInfo.isPushType(consumerRunningInfo)) { + ConsumeMessageDirectlyResult result = + defaultMQAdminExt.consumeMessageDirectly(consumerGroup, clientId, topic, msgId); + System.out.printf("%s", result); + } else { + System.out.printf("this %s client is not push consumer ,not support direct push \n", clientId); + } + } else { queryById(defaultMQAdminExt, topic, msgId, showAll); }
