vongosling closed pull request #378: [ISSUE #377] FIX Admin subcommand 
consumeMessage should quit when catching an exception
URL: https://github.com/apache/rocketmq/pull/378
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/tools/src/main/java/org/apache/rocketmq/tools/command/message/ConsumeMessageCommand.java
 
b/tools/src/main/java/org/apache/rocketmq/tools/command/message/ConsumeMessageCommand.java
index 518926774..aa98ee628 100644
--- 
a/tools/src/main/java/org/apache/rocketmq/tools/command/message/ConsumeMessageCommand.java
+++ 
b/tools/src/main/java/org/apache/rocketmq/tools/command/message/ConsumeMessageCommand.java
@@ -207,6 +207,7 @@ private void pullMessageByQueue(MessageQueue mq, long 
minOffset, long maxOffset)
                 pullResult = defaultMQPullConsumer.pull(mq, "*", offset, 
(int)(maxOffset - offset + 1));
             } catch (Exception e) {
                 e.printStackTrace();
+                return;
             }
             if (pullResult != null) {
                 offset = pullResult.getNextBeginOffset();
diff --git 
a/tools/src/test/java/org/apache/rocketmq/tools/command/message/ConsumeMessageCommandTest.java
 
b/tools/src/test/java/org/apache/rocketmq/tools/command/message/ConsumeMessageCommandTest.java
index 9a5998e29..11543958c 100644
--- 
a/tools/src/test/java/org/apache/rocketmq/tools/command/message/ConsumeMessageCommandTest.java
+++ 
b/tools/src/test/java/org/apache/rocketmq/tools/command/message/ConsumeMessageCommandTest.java
@@ -16,6 +16,13 @@
  */
 package org.apache.rocketmq.tools.command.message;
 
+import java.io.ByteArrayOutputStream;
+import java.io.PrintStream;
+import java.lang.reflect.Field;
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
 import org.apache.commons.cli.CommandLine;
 import org.apache.commons.cli.Options;
 import org.apache.commons.cli.PosixParser;
@@ -34,14 +41,6 @@
 import org.junit.BeforeClass;
 import org.junit.Test;
 
-import java.io.ByteArrayOutputStream;
-import java.io.PrintStream;
-import java.lang.reflect.Field;
-import java.util.ArrayList;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Set;
-
 import static org.mockito.ArgumentMatchers.any;
 import static org.mockito.ArgumentMatchers.anyInt;
 import static org.mockito.ArgumentMatchers.anyLong;
@@ -54,14 +53,14 @@
 
     @BeforeClass
     public static void init() throws MQClientException, RemotingException, 
MQBrokerException, InterruptedException,
-        NoSuchFieldException, IllegalAccessException{
+        NoSuchFieldException, IllegalAccessException {
         consumeMessageCommand = new ConsumeMessageCommand();
         DefaultMQPullConsumer defaultMQPullConsumer = 
mock(DefaultMQPullConsumer.class);
         MessageExt msg = new MessageExt();
-        msg.setBody(new byte[]{'a'});
+        msg.setBody(new byte[] {'a'});
         List<MessageExt> msgFoundList = new ArrayList<>();
         msgFoundList.add(msg);
-        final PullResult pullResult = new PullResult(PullStatus.FOUND,2, 0, 1, 
msgFoundList);
+        final PullResult pullResult = new PullResult(PullStatus.FOUND, 2, 0, 
1, msgFoundList);
 
         when(defaultMQPullConsumer.pull(any(MessageQueue.class), anyString(), 
anyLong(), anyInt())).thenReturn(pullResult);
         
when(defaultMQPullConsumer.minOffset(any(MessageQueue.class))).thenReturn(Long.valueOf(0));
@@ -73,8 +72,9 @@ public static void init() throws MQClientException, 
RemotingException, MQBrokerE
 
         Field producerField = 
ConsumeMessageCommand.class.getDeclaredField("defaultMQPullConsumer");
         producerField.setAccessible(true);
-        producerField.set(consumeMessageCommand,defaultMQPullConsumer);
+        producerField.set(consumeMessageCommand, defaultMQPullConsumer);
     }
+
     @AfterClass
     public static void terminate() {
     }
@@ -102,11 +102,55 @@ public void testExecuteByCondition() throws 
SubCommandException {
         System.setOut(new PrintStream(bos));
         Options options = ServerUtil.buildCommandlineOptions(new Options());
 
-        String[] subargs = new String[] {"-t mytopic","-b localhost","-i 0", 
"-n localhost:9876"};
+        String[] subargs = new String[] {"-t mytopic", "-b localhost", "-i 0", 
"-n localhost:9876"};
         CommandLine commandLine = ServerUtil.parseCmdLine("mqadmin " + 
consumeMessageCommand.commandName(), subargs, 
consumeMessageCommand.buildCommandlineOptions(options), new PosixParser());
         consumeMessageCommand.execute(commandLine, options, null);
         System.setOut(out);
         String s = new String(bos.toByteArray());
         Assert.assertTrue(s.contains("Consume ok"));
     }
+
+    @Test
+    public void testExecuteDefaultWhenPullMessageByQueueGotException() throws 
SubCommandException, InterruptedException, RemotingException, 
MQClientException, MQBrokerException, NoSuchFieldException, 
IllegalAccessException {
+        DefaultMQPullConsumer defaultMQPullConsumer = 
mock(DefaultMQPullConsumer.class);
+        when(defaultMQPullConsumer.pull(any(MessageQueue.class), anyString(), 
anyLong(), anyInt())).thenThrow(Exception.class);
+        Field producerField = 
ConsumeMessageCommand.class.getDeclaredField("defaultMQPullConsumer");
+        producerField.setAccessible(true);
+        producerField.set(consumeMessageCommand, defaultMQPullConsumer);
+
+        PrintStream out = System.out;
+        ByteArrayOutputStream bos = new ByteArrayOutputStream();
+        System.setOut(new PrintStream(bos));
+        Options options = ServerUtil.buildCommandlineOptions(new Options());
+        String[] subargs = new String[] {"-t topic-not-existu", "-n 
localhost:9876"};
+        CommandLine commandLine = ServerUtil.parseCmdLine("mqadmin " + 
consumeMessageCommand.commandName(),
+            subargs, consumeMessageCommand.buildCommandlineOptions(options), 
new PosixParser());
+        consumeMessageCommand.execute(commandLine, options, null);
+
+        System.setOut(out);
+        String s = new String(bos.toByteArray());
+        Assert.assertTrue(!s.contains("Consume ok"));
+    }
+
+    @Test
+    public void testExecuteByConditionWhenPullMessageByQueueGotException() 
throws IllegalAccessException, InterruptedException, RemotingException, 
MQClientException, MQBrokerException, NoSuchFieldException, SubCommandException 
{
+        DefaultMQPullConsumer defaultMQPullConsumer = 
mock(DefaultMQPullConsumer.class);
+        when(defaultMQPullConsumer.pull(any(MessageQueue.class), anyString(), 
anyLong(), anyInt())).thenThrow(Exception.class);
+        Field producerField = 
ConsumeMessageCommand.class.getDeclaredField("defaultMQPullConsumer");
+        producerField.setAccessible(true);
+        producerField.set(consumeMessageCommand, defaultMQPullConsumer);
+
+        PrintStream out = System.out;
+        ByteArrayOutputStream bos = new ByteArrayOutputStream();
+        System.setOut(new PrintStream(bos));
+        Options options = ServerUtil.buildCommandlineOptions(new Options());
+
+        String[] subargs = new String[] {"-t mytopic", "-b localhost", "-i 0", 
"-n localhost:9876"};
+        CommandLine commandLine = ServerUtil.parseCmdLine("mqadmin " + 
consumeMessageCommand.commandName(), subargs, 
consumeMessageCommand.buildCommandlineOptions(options), new PosixParser());
+        consumeMessageCommand.execute(commandLine, options, null);
+
+        System.setOut(out);
+        String s = new String(bos.toByteArray());
+        Assert.assertTrue(!s.contains("Consume ok"));
+    }
 }
\ No newline at end of file


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

Reply via email to