This is an automated email from the ASF dual-hosted git repository.

vongosling pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git


The following commit(s) were added to refs/heads/develop by this push:
     new 3f7dda3  [ISSUE #377] FIX Admin subcommand consumeMessage should quit 
when catching an exception (#378)
3f7dda3 is described below

commit 3f7dda3c0c5fb7de74ece7b9ce690a978c58a319
Author: XiaoZYang <[email protected]>
AuthorDate: Mon Jul 23 19:50:51 2018 +0800

    [ISSUE #377] FIX Admin subcommand consumeMessage should quit when catching 
an exception (#378)
---
 .../command/message/ConsumeMessageCommand.java     |  1 +
 .../command/message/ConsumeMessageCommandTest.java | 70 ++++++++++++++++++----
 2 files changed, 58 insertions(+), 13 deletions(-)

diff --git 
a/tools/src/main/java/org/apache/rocketmq/tools/command/message/ConsumeMessageCommand.java
 
b/tools/src/main/java/org/apache/rocketmq/tools/command/message/ConsumeMessageCommand.java
index 5189267..aa98ee6 100644
--- 
a/tools/src/main/java/org/apache/rocketmq/tools/command/message/ConsumeMessageCommand.java
+++ 
b/tools/src/main/java/org/apache/rocketmq/tools/command/message/ConsumeMessageCommand.java
@@ -207,6 +207,7 @@ public class ConsumeMessageCommand implements SubCommand {
                 pullResult = defaultMQPullConsumer.pull(mq, "*", offset, 
(int)(maxOffset - offset + 1));
             } catch (Exception e) {
                 e.printStackTrace();
+                return;
             }
             if (pullResult != null) {
                 offset = pullResult.getNextBeginOffset();
diff --git 
a/tools/src/test/java/org/apache/rocketmq/tools/command/message/ConsumeMessageCommandTest.java
 
b/tools/src/test/java/org/apache/rocketmq/tools/command/message/ConsumeMessageCommandTest.java
index 9a5998e..1154395 100644
--- 
a/tools/src/test/java/org/apache/rocketmq/tools/command/message/ConsumeMessageCommandTest.java
+++ 
b/tools/src/test/java/org/apache/rocketmq/tools/command/message/ConsumeMessageCommandTest.java
@@ -16,6 +16,13 @@
  */
 package org.apache.rocketmq.tools.command.message;
 
+import java.io.ByteArrayOutputStream;
+import java.io.PrintStream;
+import java.lang.reflect.Field;
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
 import org.apache.commons.cli.CommandLine;
 import org.apache.commons.cli.Options;
 import org.apache.commons.cli.PosixParser;
@@ -34,14 +41,6 @@ import org.junit.Assert;
 import org.junit.BeforeClass;
 import org.junit.Test;
 
-import java.io.ByteArrayOutputStream;
-import java.io.PrintStream;
-import java.lang.reflect.Field;
-import java.util.ArrayList;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Set;
-
 import static org.mockito.ArgumentMatchers.any;
 import static org.mockito.ArgumentMatchers.anyInt;
 import static org.mockito.ArgumentMatchers.anyLong;
@@ -54,14 +53,14 @@ public class ConsumeMessageCommandTest {
 
     @BeforeClass
     public static void init() throws MQClientException, RemotingException, 
MQBrokerException, InterruptedException,
-        NoSuchFieldException, IllegalAccessException{
+        NoSuchFieldException, IllegalAccessException {
         consumeMessageCommand = new ConsumeMessageCommand();
         DefaultMQPullConsumer defaultMQPullConsumer = 
mock(DefaultMQPullConsumer.class);
         MessageExt msg = new MessageExt();
-        msg.setBody(new byte[]{'a'});
+        msg.setBody(new byte[] {'a'});
         List<MessageExt> msgFoundList = new ArrayList<>();
         msgFoundList.add(msg);
-        final PullResult pullResult = new PullResult(PullStatus.FOUND,2, 0, 1, 
msgFoundList);
+        final PullResult pullResult = new PullResult(PullStatus.FOUND, 2, 0, 
1, msgFoundList);
 
         when(defaultMQPullConsumer.pull(any(MessageQueue.class), anyString(), 
anyLong(), anyInt())).thenReturn(pullResult);
         
when(defaultMQPullConsumer.minOffset(any(MessageQueue.class))).thenReturn(Long.valueOf(0));
@@ -73,8 +72,9 @@ public class ConsumeMessageCommandTest {
 
         Field producerField = 
ConsumeMessageCommand.class.getDeclaredField("defaultMQPullConsumer");
         producerField.setAccessible(true);
-        producerField.set(consumeMessageCommand,defaultMQPullConsumer);
+        producerField.set(consumeMessageCommand, defaultMQPullConsumer);
     }
+
     @AfterClass
     public static void terminate() {
     }
@@ -102,11 +102,55 @@ public class ConsumeMessageCommandTest {
         System.setOut(new PrintStream(bos));
         Options options = ServerUtil.buildCommandlineOptions(new Options());
 
-        String[] subargs = new String[] {"-t mytopic","-b localhost","-i 0", 
"-n localhost:9876"};
+        String[] subargs = new String[] {"-t mytopic", "-b localhost", "-i 0", 
"-n localhost:9876"};
         CommandLine commandLine = ServerUtil.parseCmdLine("mqadmin " + 
consumeMessageCommand.commandName(), subargs, 
consumeMessageCommand.buildCommandlineOptions(options), new PosixParser());
         consumeMessageCommand.execute(commandLine, options, null);
         System.setOut(out);
         String s = new String(bos.toByteArray());
         Assert.assertTrue(s.contains("Consume ok"));
     }
+
+    @Test
+    public void testExecuteDefaultWhenPullMessageByQueueGotException() throws 
SubCommandException, InterruptedException, RemotingException, 
MQClientException, MQBrokerException, NoSuchFieldException, 
IllegalAccessException {
+        DefaultMQPullConsumer defaultMQPullConsumer = 
mock(DefaultMQPullConsumer.class);
+        when(defaultMQPullConsumer.pull(any(MessageQueue.class), anyString(), 
anyLong(), anyInt())).thenThrow(Exception.class);
+        Field producerField = 
ConsumeMessageCommand.class.getDeclaredField("defaultMQPullConsumer");
+        producerField.setAccessible(true);
+        producerField.set(consumeMessageCommand, defaultMQPullConsumer);
+
+        PrintStream out = System.out;
+        ByteArrayOutputStream bos = new ByteArrayOutputStream();
+        System.setOut(new PrintStream(bos));
+        Options options = ServerUtil.buildCommandlineOptions(new Options());
+        String[] subargs = new String[] {"-t topic-not-existu", "-n 
localhost:9876"};
+        CommandLine commandLine = ServerUtil.parseCmdLine("mqadmin " + 
consumeMessageCommand.commandName(),
+            subargs, consumeMessageCommand.buildCommandlineOptions(options), 
new PosixParser());
+        consumeMessageCommand.execute(commandLine, options, null);
+
+        System.setOut(out);
+        String s = new String(bos.toByteArray());
+        Assert.assertTrue(!s.contains("Consume ok"));
+    }
+
+    @Test
+    public void testExecuteByConditionWhenPullMessageByQueueGotException() 
throws IllegalAccessException, InterruptedException, RemotingException, 
MQClientException, MQBrokerException, NoSuchFieldException, SubCommandException 
{
+        DefaultMQPullConsumer defaultMQPullConsumer = 
mock(DefaultMQPullConsumer.class);
+        when(defaultMQPullConsumer.pull(any(MessageQueue.class), anyString(), 
anyLong(), anyInt())).thenThrow(Exception.class);
+        Field producerField = 
ConsumeMessageCommand.class.getDeclaredField("defaultMQPullConsumer");
+        producerField.setAccessible(true);
+        producerField.set(consumeMessageCommand, defaultMQPullConsumer);
+
+        PrintStream out = System.out;
+        ByteArrayOutputStream bos = new ByteArrayOutputStream();
+        System.setOut(new PrintStream(bos));
+        Options options = ServerUtil.buildCommandlineOptions(new Options());
+
+        String[] subargs = new String[] {"-t mytopic", "-b localhost", "-i 0", 
"-n localhost:9876"};
+        CommandLine commandLine = ServerUtil.parseCmdLine("mqadmin " + 
consumeMessageCommand.commandName(), subargs, 
consumeMessageCommand.buildCommandlineOptions(options), new PosixParser());
+        consumeMessageCommand.execute(commandLine, options, null);
+
+        System.setOut(out);
+        String s = new String(bos.toByteArray());
+        Assert.assertTrue(!s.contains("Consume ok"));
+    }
 }
\ No newline at end of file

Reply via email to