vongosling closed pull request #332: [ROCKETMQ-353] Add sendMessageCommand and 
consumeMessageCommand
URL: https://github.com/apache/rocketmq/pull/332
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/tools/src/main/java/org/apache/rocketmq/tools/command/MQAdminStartup.java 
b/tools/src/main/java/org/apache/rocketmq/tools/command/MQAdminStartup.java
index d3342e818..c189e866d 100644
--- a/tools/src/main/java/org/apache/rocketmq/tools/command/MQAdminStartup.java
+++ b/tools/src/main/java/org/apache/rocketmq/tools/command/MQAdminStartup.java
@@ -48,12 +48,14 @@
 import org.apache.rocketmq.tools.command.consumer.StartMonitoringSubCommand;
 import org.apache.rocketmq.tools.command.consumer.UpdateSubGroupSubCommand;
 import org.apache.rocketmq.tools.command.message.CheckMsgSendRTCommand;
+import org.apache.rocketmq.tools.command.message.ConsumeMessageCommand;
 import org.apache.rocketmq.tools.command.message.PrintMessageByQueueCommand;
 import org.apache.rocketmq.tools.command.message.PrintMessageSubCommand;
 import org.apache.rocketmq.tools.command.message.QueryMsgByIdSubCommand;
 import org.apache.rocketmq.tools.command.message.QueryMsgByKeySubCommand;
 import org.apache.rocketmq.tools.command.message.QueryMsgByOffsetSubCommand;
 import org.apache.rocketmq.tools.command.message.QueryMsgByUniqueKeySubCommand;
+import org.apache.rocketmq.tools.command.message.SendMessageCommand;
 import org.apache.rocketmq.tools.command.namesrv.DeleteKvConfigCommand;
 import org.apache.rocketmq.tools.command.namesrv.GetNamesrvConfigCommand;
 import org.apache.rocketmq.tools.command.namesrv.UpdateKvConfigCommand;
@@ -193,6 +195,8 @@ public static void initCommand() {
         initCommand(new GetBrokerConfigCommand());
 
         initCommand(new QueryConsumeQueueCommand());
+        initCommand(new SendMessageCommand());
+        initCommand(new ConsumeMessageCommand());
     }
 
     private static void initLogback() throws JoranException {
diff --git 
a/tools/src/main/java/org/apache/rocketmq/tools/command/message/ConsumeMessageCommand.java
 
b/tools/src/main/java/org/apache/rocketmq/tools/command/message/ConsumeMessageCommand.java
new file mode 100644
index 000000000..518926774
--- /dev/null
+++ 
b/tools/src/main/java/org/apache/rocketmq/tools/command/message/ConsumeMessageCommand.java
@@ -0,0 +1,292 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.tools.command.message;
+
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.Option;
+import org.apache.commons.cli.Options;
+import org.apache.rocketmq.client.consumer.DefaultMQPullConsumer;
+import org.apache.rocketmq.client.consumer.PullResult;
+import org.apache.rocketmq.common.MixAll;
+import org.apache.rocketmq.common.UtilAll;
+import org.apache.rocketmq.common.message.MessageQueue;
+import org.apache.rocketmq.remoting.RPCHook;
+import org.apache.rocketmq.tools.command.SubCommand;
+import org.apache.rocketmq.tools.command.SubCommandException;
+
+import java.util.Set;
+
+public class ConsumeMessageCommand implements SubCommand {
+
+    private String topic = null;
+    private long messageCount = 128;
+    private DefaultMQPullConsumer defaultMQPullConsumer;
+
+
+    public enum ConsumeType {
+        /**
+         * Topic only
+         */
+        DEFAULT,
+        /**
+         * Topic brokerName queueId set
+         */
+        BYQUEUE,
+        /**
+         * Topic brokerName queueId offset set
+         */
+        BYOFFSET
+    }
+
+    private static long timestampFormat(final String value) {
+        long timestamp;
+        try {
+            timestamp = Long.parseLong(value);
+        } catch (NumberFormatException e) {
+            timestamp = UtilAll.parseDate(value, 
UtilAll.YYYY_MM_DD_HH_MM_SS_SSS).getTime();
+        }
+
+        return timestamp;
+    }
+    @Override
+    public String commandName() {
+        return "consumeMessage";
+    }
+
+    @Override
+    public String commandDesc() {
+        return "Consume message";
+    }
+
+    @Override
+    public Options buildCommandlineOptions(final Options options) {
+        Option opt = new Option("t", "topic", true, "Topic name");
+        opt.setRequired(true);
+        options.addOption(opt);
+
+        opt = new Option("b", "brokerName", true, "Broker name");
+        opt.setRequired(false);
+        options.addOption(opt);
+
+        opt = new Option("i", "queueId", true, "Queue Id");
+        opt.setRequired(false);
+        options.addOption(opt);
+
+        opt = new Option("o", "offset", true, "Queue offset");
+        opt.setRequired(false);
+        options.addOption(opt);
+
+        opt = new Option("g", "consumerGroup", true, "Consumer group name");
+        opt.setRequired(false);
+        options.addOption(opt);
+
+        opt = new Option("s", "beginTimestamp ", true,
+                "Begin timestamp[currentTimeMillis|yyyy-MM-dd#HH:mm:ss:SSS]");
+        opt.setRequired(false);
+        options.addOption(opt);
+
+        opt = new Option("e", "endTimestamp ", true,
+                "End timestamp[currentTimeMillis|yyyy-MM-dd#HH:mm:ss:SSS]");
+        opt.setRequired(false);
+        options.addOption(opt);
+
+        opt = new Option("c", "MessageNumber", true, "Number of message to be 
consumed");
+        opt.setRequired(false);
+        options.addOption(opt);
+
+
+        return options;
+
+    }
+
+    @Override
+    public void execute(final CommandLine commandLine, final Options options, 
RPCHook rpcHook) throws SubCommandException {
+        if (defaultMQPullConsumer == null) {
+            defaultMQPullConsumer = new 
DefaultMQPullConsumer(MixAll.TOOLS_CONSUMER_GROUP, rpcHook);
+        }
+        
defaultMQPullConsumer.setInstanceName(Long.toString(System.currentTimeMillis()));
+
+        long offset = 0;
+        long timeValueEnd = 0;
+        long timeValueBegin = 0;
+        String queueId = null;
+        String brokerName = null;
+        ConsumeType consumeType = ConsumeType.DEFAULT;
+
+        try {
+            /* Group name must be set before consumer start */
+            if (commandLine.hasOption('g')) {
+                String consumerGroup = commandLine.getOptionValue('b').trim();
+                defaultMQPullConsumer.setConsumerGroup(consumerGroup);
+            }
+
+            defaultMQPullConsumer.start();
+
+            topic = commandLine.getOptionValue('t').trim();
+
+            if (commandLine.hasOption('c')) {
+                messageCount = 
Long.parseLong(commandLine.getOptionValue('c').trim());
+                if (messageCount <= 0) {
+                    System.out.print("please input a positive messageNumber!");
+                    return;
+                }
+            }
+            if (commandLine.hasOption('b')) {
+                brokerName = commandLine.getOptionValue('b').trim();
+
+            }
+            if (commandLine.hasOption('i')) {
+                if (!commandLine.hasOption('b')) {
+                    System.out.print("Please set the brokerName before 
queueId!");
+                    return;
+                }
+                queueId = commandLine.getOptionValue('i').trim();
+
+                consumeType = ConsumeType.BYQUEUE;
+            }
+            if (commandLine.hasOption('o')) {
+                if (consumeType != ConsumeType.BYQUEUE) {
+                    System.out.print("please set queueId before offset!");
+                    return;
+                }
+                offset = 
Long.parseLong(commandLine.getOptionValue('o').trim());
+                consumeType = ConsumeType.BYOFFSET;
+            }
+
+            if (commandLine.hasOption('s')) {
+                String timestampStr = commandLine.getOptionValue('s').trim();
+                timeValueBegin = timestampFormat(timestampStr);
+            }
+            if (commandLine.hasOption('e')) {
+                String timestampStr = commandLine.getOptionValue('e').trim();
+                timeValueEnd = timestampFormat(timestampStr);
+            }
+
+            switch (consumeType) {
+                case DEFAULT:
+                    executeDefault(timeValueBegin, timeValueEnd);
+                    break;
+                case BYOFFSET:
+                    executeByCondition(brokerName, queueId, offset, 
timeValueBegin, timeValueEnd);
+                    break;
+                case BYQUEUE:
+                    executeByCondition(brokerName, queueId, 0, timeValueBegin, 
timeValueEnd);
+                    break;
+                default:
+                    System.out.print("Unknown type of consume!");
+                    break;
+            }
+
+        } catch (Exception e) {
+            throw new SubCommandException(this.getClass().getSimpleName() + " 
command failed", e);
+        } finally {
+            defaultMQPullConsumer.shutdown();
+        }
+    }
+
+    private void pullMessageByQueue(MessageQueue mq, long minOffset, long 
maxOffset) {
+        READQ:
+        for (long offset = minOffset; offset <= maxOffset; ) {
+            PullResult pullResult = null;
+            try {
+                pullResult = defaultMQPullConsumer.pull(mq, "*", offset, 
(int)(maxOffset - offset + 1));
+            } catch (Exception e) {
+                e.printStackTrace();
+            }
+            if (pullResult != null) {
+                offset = pullResult.getNextBeginOffset();
+                switch (pullResult.getPullStatus()) {
+                    case FOUND:
+                        System.out.print("Consume ok\n");
+                        
PrintMessageByQueueCommand.printMessage(pullResult.getMsgFoundList(), "UTF-8",
+                            true, true);
+                        break;
+                    case NO_MATCHED_MSG:
+                        System.out.printf("%s no matched msg. status=%s, 
offset=%s\n", mq, pullResult.getPullStatus(),
+                            offset);
+                        break;
+                    case NO_NEW_MSG:
+                    case OFFSET_ILLEGAL:
+                        System.out.printf("%s print msg finished. status=%s, 
offset=%s\n", mq,
+                            pullResult.getPullStatus(), offset);
+                        break READQ;
+                    default:
+                        break;
+                }
+            }
+        }
+    }
+
+    private void executeDefault(long timeValueBegin, long timeValueEnd) {
+        try {
+            Set<MessageQueue> mqs = 
defaultMQPullConsumer.fetchSubscribeMessageQueues(topic);
+            long countLeft = messageCount;
+            for (MessageQueue mq : mqs) {
+                if (countLeft == 0) {
+                    return;
+                }
+                long minOffset = defaultMQPullConsumer.minOffset(mq);
+                long maxOffset = defaultMQPullConsumer.maxOffset(mq);
+                if (timeValueBegin > 0) {
+                    minOffset = defaultMQPullConsumer.searchOffset(mq, 
timeValueBegin);
+                }
+                if (timeValueEnd > 0) {
+                    maxOffset = defaultMQPullConsumer.searchOffset(mq, 
timeValueEnd);
+                }
+                if (maxOffset - minOffset > countLeft) {
+                    System.out.printf("The older %d message of the %d queue 
will be provided\n", countLeft, mq.getQueueId());
+                    maxOffset = minOffset + countLeft - 1;
+                    countLeft = 0;
+                } else {
+                    countLeft = countLeft - (maxOffset - minOffset) - 1;
+                }
+
+                pullMessageByQueue(mq, minOffset, maxOffset);
+            }
+        } catch (Exception e) {
+            e.printStackTrace();
+        }
+    }
+
+    private void executeByCondition(String brokerName, String queueId, long 
offset, long timeValueBegin, long timeValueEnd) {
+        MessageQueue mq = new MessageQueue(topic, brokerName, 
Integer.parseInt(queueId));
+        try {
+            long minOffset = defaultMQPullConsumer.minOffset(mq);
+            long maxOffset = defaultMQPullConsumer.maxOffset(mq);
+            if (timeValueBegin > 0) {
+                minOffset = defaultMQPullConsumer.searchOffset(mq, 
timeValueBegin);
+            }
+            if (timeValueEnd > 0) {
+                maxOffset = defaultMQPullConsumer.searchOffset(mq, 
timeValueEnd);
+            }
+            if (offset > maxOffset) {
+                System.out.printf("%s no matched msg, offset=%s\n", mq, 
offset);
+                return;
+            }
+            minOffset = minOffset > offset ? minOffset : offset;
+            if (maxOffset - minOffset > messageCount) {
+                System.out.printf("The oldler %d message will be provided\n", 
messageCount);
+                maxOffset = minOffset + messageCount - 1;
+            }
+
+            pullMessageByQueue(mq, minOffset, maxOffset);
+        } catch (Exception e) {
+            e.printStackTrace();
+        }
+    }
+}
\ No newline at end of file
diff --git 
a/tools/src/main/java/org/apache/rocketmq/tools/command/message/SendMessageCommand.java
 
b/tools/src/main/java/org/apache/rocketmq/tools/command/message/SendMessageCommand.java
new file mode 100644
index 000000000..e4921c6f0
--- /dev/null
+++ 
b/tools/src/main/java/org/apache/rocketmq/tools/command/message/SendMessageCommand.java
@@ -0,0 +1,156 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.tools.command.message;
+
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.Option;
+import org.apache.commons.cli.Options;
+import org.apache.rocketmq.client.producer.DefaultMQProducer;
+import org.apache.rocketmq.client.producer.SendResult;
+import org.apache.rocketmq.common.message.Message;
+import org.apache.rocketmq.common.message.MessageQueue;
+import org.apache.rocketmq.remoting.RPCHook;
+import org.apache.rocketmq.tools.command.SubCommand;
+import org.apache.rocketmq.tools.command.SubCommandException;
+
+public class SendMessageCommand implements SubCommand {
+
+    private DefaultMQProducer producer;
+
+    @Override
+    public String commandName() {
+        return "sendMessage";
+    }
+
+    @Override
+    public String commandDesc() {
+        return "Send a message";
+    }
+
+    @Override
+    public Options buildCommandlineOptions(Options options) {
+        Option opt = new Option("t", "topic", true, "Topic name");
+        opt.setRequired(true);
+        options.addOption(opt);
+
+        opt = new Option("p", "body", true, "UTF-8 string format of the 
message body");
+        opt.setRequired(true);
+        options.addOption(opt);
+
+        opt = new Option("k", "key", true, "Message keys");
+        opt.setRequired(false);
+        options.addOption(opt);
+
+        opt = new Option("c", "tags", true, "Message tags");
+        opt.setRequired(false);
+        options.addOption(opt);
+
+        opt = new Option("b", "broker", true, "Send message to target broker");
+        opt.setRequired(false);
+        options.addOption(opt);
+
+        opt = new Option("i", "qid", true, "Send message to target queue");
+        opt.setRequired(false);
+        options.addOption(opt);
+
+        return options;
+    }
+
+    private DefaultMQProducer createProducer(RPCHook rpcHook) {
+        if (this.producer != null) {
+            return producer;
+        } else {
+            producer = new DefaultMQProducer(rpcHook);
+            
producer.setProducerGroup(Long.toString(System.currentTimeMillis()));
+            return producer;
+        }
+    }
+
+    @Override
+    public void execute(CommandLine commandLine, Options options, RPCHook 
rpcHook) throws SubCommandException {
+        Message msg = null;
+        String topic = commandLine.getOptionValue('t').trim();
+        String body = commandLine.getOptionValue('p').trim();
+        String tag = null;
+        String keys = null;
+        String brokerName = null;
+        int queueId = -1;
+        try {
+            if (commandLine.hasOption('k')) {
+                keys = commandLine.getOptionValue('k').trim();
+            }
+            if (commandLine.hasOption('c')) {
+                tag = commandLine.getOptionValue('c').trim();
+            }
+            if (commandLine.hasOption('b')) {
+                brokerName = commandLine.getOptionValue('b').trim();
+            }
+            if (commandLine.hasOption('i')) {
+                if (!commandLine.hasOption('b')) {
+                    System.out.print("Broker name must be set if the queue is 
chosen!");
+                    return;
+                } else {
+                    queueId = 
Integer.parseInt(commandLine.getOptionValue('i').trim());
+                }
+            }
+            msg = new Message(topic, tag, keys, body.getBytes("utf-8"));
+        } catch (Exception e) {
+            throw new RuntimeException(this.getClass().getSimpleName() + " 
command failed", e);
+        }
+
+        DefaultMQProducer producer = this.createProducer(rpcHook);
+        SendResult result;
+        try {
+            producer.start();
+            if (brokerName != null && queueId > -1) {
+                MessageQueue messageQueue = new MessageQueue(topic, 
brokerName, queueId);
+                result = producer.send(msg, messageQueue);
+            } else {
+                result = producer.send(msg);
+            }
+
+        } catch (Exception e) {
+            throw new RuntimeException(this.getClass().getSimpleName() + " 
command failed", e);
+        } finally {
+            producer.shutdown();
+        }
+
+        System.out.printf("%-32s  %-4s  %-20s    %s%n",
+            "#Broker Name",
+            "#QID",
+            "#Send Result",
+            "#MsgId"
+        );
+
+        if (result != null) {
+            System.out.printf("%-32s  %-4s  %-20s    %s%n",
+                result.getMessageQueue().getBrokerName(),
+                result.getMessageQueue().getQueueId(),
+                result.getSendStatus(),
+                result.getMsgId()
+            );
+        } else {
+            System.out.printf("%-32s  %-4s  %-20s    %s%n",
+                "Unknown",
+                "Unknown",
+                "Failed",
+                "None"
+            );
+        }
+    }
+}
\ No newline at end of file
diff --git 
a/tools/src/test/java/org/apache/rocketmq/tools/command/message/ConsumeMessageCommandTest.java
 
b/tools/src/test/java/org/apache/rocketmq/tools/command/message/ConsumeMessageCommandTest.java
new file mode 100644
index 000000000..9a5998e29
--- /dev/null
+++ 
b/tools/src/test/java/org/apache/rocketmq/tools/command/message/ConsumeMessageCommandTest.java
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.tools.command.message;
+
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.Options;
+import org.apache.commons.cli.PosixParser;
+import org.apache.rocketmq.client.consumer.DefaultMQPullConsumer;
+import org.apache.rocketmq.client.consumer.PullResult;
+import org.apache.rocketmq.client.consumer.PullStatus;
+import org.apache.rocketmq.client.exception.MQBrokerException;
+import org.apache.rocketmq.client.exception.MQClientException;
+import org.apache.rocketmq.common.message.MessageExt;
+import org.apache.rocketmq.common.message.MessageQueue;
+import org.apache.rocketmq.remoting.exception.RemotingException;
+import org.apache.rocketmq.srvutil.ServerUtil;
+import org.apache.rocketmq.tools.command.SubCommandException;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import java.io.ByteArrayOutputStream;
+import java.io.PrintStream;
+import java.lang.reflect.Field;
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyInt;
+import static org.mockito.ArgumentMatchers.anyLong;
+import static org.mockito.ArgumentMatchers.anyString;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+public class ConsumeMessageCommandTest {
+    private static ConsumeMessageCommand consumeMessageCommand;
+
+    @BeforeClass
+    public static void init() throws MQClientException, RemotingException, 
MQBrokerException, InterruptedException,
+        NoSuchFieldException, IllegalAccessException{
+        consumeMessageCommand = new ConsumeMessageCommand();
+        DefaultMQPullConsumer defaultMQPullConsumer = 
mock(DefaultMQPullConsumer.class);
+        MessageExt msg = new MessageExt();
+        msg.setBody(new byte[]{'a'});
+        List<MessageExt> msgFoundList = new ArrayList<>();
+        msgFoundList.add(msg);
+        final PullResult pullResult = new PullResult(PullStatus.FOUND,2, 0, 1, 
msgFoundList);
+
+        when(defaultMQPullConsumer.pull(any(MessageQueue.class), anyString(), 
anyLong(), anyInt())).thenReturn(pullResult);
+        
when(defaultMQPullConsumer.minOffset(any(MessageQueue.class))).thenReturn(Long.valueOf(0));
+        
when(defaultMQPullConsumer.maxOffset(any(MessageQueue.class))).thenReturn(Long.valueOf(1));
+
+        final Set<MessageQueue> mqList = new HashSet<>();
+        mqList.add(new MessageQueue());
+        
when(defaultMQPullConsumer.fetchSubscribeMessageQueues(anyString())).thenReturn(mqList);
+
+        Field producerField = 
ConsumeMessageCommand.class.getDeclaredField("defaultMQPullConsumer");
+        producerField.setAccessible(true);
+        producerField.set(consumeMessageCommand,defaultMQPullConsumer);
+    }
+    @AfterClass
+    public static void terminate() {
+    }
+
+    @Test
+    public void testExecuteDefault() throws SubCommandException {
+        PrintStream out = System.out;
+        ByteArrayOutputStream bos = new ByteArrayOutputStream();
+        System.setOut(new PrintStream(bos));
+        Options options = ServerUtil.buildCommandlineOptions(new Options());
+        String[] subargs = new String[] {"-t mytopic", "-n localhost:9876"};
+        CommandLine commandLine = ServerUtil.parseCmdLine("mqadmin " + 
consumeMessageCommand.commandName(),
+            subargs, consumeMessageCommand.buildCommandlineOptions(options), 
new PosixParser());
+        consumeMessageCommand.execute(commandLine, options, null);
+
+        System.setOut(out);
+        String s = new String(bos.toByteArray());
+        Assert.assertTrue(s.contains("Consume ok"));
+    }
+
+    @Test
+    public void testExecuteByCondition() throws SubCommandException {
+        PrintStream out = System.out;
+        ByteArrayOutputStream bos = new ByteArrayOutputStream();
+        System.setOut(new PrintStream(bos));
+        Options options = ServerUtil.buildCommandlineOptions(new Options());
+
+        String[] subargs = new String[] {"-t mytopic","-b localhost","-i 0", 
"-n localhost:9876"};
+        CommandLine commandLine = ServerUtil.parseCmdLine("mqadmin " + 
consumeMessageCommand.commandName(), subargs, 
consumeMessageCommand.buildCommandlineOptions(options), new PosixParser());
+        consumeMessageCommand.execute(commandLine, options, null);
+        System.setOut(out);
+        String s = new String(bos.toByteArray());
+        Assert.assertTrue(s.contains("Consume ok"));
+    }
+}
\ No newline at end of file
diff --git 
a/tools/src/test/java/org/apache/rocketmq/tools/command/message/SendMessageCommandTest.java
 
b/tools/src/test/java/org/apache/rocketmq/tools/command/message/SendMessageCommandTest.java
new file mode 100644
index 000000000..e4c6673d0
--- /dev/null
+++ 
b/tools/src/test/java/org/apache/rocketmq/tools/command/message/SendMessageCommandTest.java
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.tools.command.message;
+
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.Options;
+import org.apache.commons.cli.PosixParser;
+import org.apache.rocketmq.client.exception.MQBrokerException;
+import org.apache.rocketmq.client.exception.MQClientException;
+import org.apache.rocketmq.client.producer.DefaultMQProducer;
+import org.apache.rocketmq.client.producer.SendResult;
+import org.apache.rocketmq.client.producer.SendStatus;
+import org.apache.rocketmq.common.message.Message;
+import org.apache.rocketmq.common.message.MessageQueue;
+import org.apache.rocketmq.remoting.exception.RemotingException;
+import org.apache.rocketmq.srvutil.ServerUtil;
+import org.apache.rocketmq.tools.command.SubCommandException;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import java.io.ByteArrayOutputStream;
+import java.io.PrintStream;
+import java.lang.reflect.Field;
+
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+public class SendMessageCommandTest {
+
+    private static SendMessageCommand sendMessageCommand = new 
SendMessageCommand();
+
+    @BeforeClass
+    public static void init() throws MQClientException, RemotingException, 
InterruptedException, MQBrokerException, NoSuchFieldException, 
IllegalAccessException {
+
+        DefaultMQProducer defaultMQProducer = mock(DefaultMQProducer.class);
+        SendResult sendResult = new SendResult();
+        sendResult.setMessageQueue(new MessageQueue());
+        sendResult.getMessageQueue().setBrokerName("broker1");
+        sendResult.getMessageQueue().setQueueId(1);
+        sendResult.setSendStatus(SendStatus.SEND_OK);
+        sendResult.setMsgId("fgwejigherughwueyutyu4t4343t43");
+
+        
when(defaultMQProducer.send(any(Message.class))).thenReturn(sendResult);
+        when(defaultMQProducer.send(any(Message.class), 
any(MessageQueue.class))).thenReturn(sendResult);
+
+        Field producerField = 
SendMessageCommand.class.getDeclaredField("producer");
+        producerField.setAccessible(true);
+        producerField.set(sendMessageCommand, defaultMQProducer);
+    }
+
+    @AfterClass
+    public static void terminate() {
+    }
+
+    @Test
+    public void testExecute() throws SubCommandException {
+        PrintStream out = System.out;
+        ByteArrayOutputStream bos = new ByteArrayOutputStream();
+        System.setOut(new PrintStream(bos));
+        Options options = ServerUtil.buildCommandlineOptions(new Options());
+        String[] subargs = new String[] {"-t mytopic","-p 'send message 
test'","-c tagA","-k order-16546745756"};
+        CommandLine commandLine = ServerUtil.parseCmdLine("mqadmin " + 
sendMessageCommand.commandName(), subargs, 
sendMessageCommand.buildCommandlineOptions(options), new PosixParser());
+        sendMessageCommand.execute(commandLine, options, null);
+
+        subargs = new String[] {"-t mytopic","-p 'send message test'","-c 
tagA","-k order-16546745756","-b brokera","-i 1"};
+        commandLine = ServerUtil.parseCmdLine("mqadmin " + 
sendMessageCommand.commandName(), subargs, 
sendMessageCommand.buildCommandlineOptions(options), new PosixParser());
+        sendMessageCommand.execute(commandLine, options, null);
+        System.setOut(out);
+        String s = new String(bos.toByteArray());
+        Assert.assertTrue(s.contains("SEND_OK"));
+    }
+}


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

Reply via email to