vongosling closed pull request #378: [ISSUE #377] FIX Admin subcommand
consumeMessage should quit when catching an exception
URL: https://github.com/apache/rocketmq/pull/378
This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:
As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):
diff --git
a/tools/src/main/java/org/apache/rocketmq/tools/command/message/ConsumeMessageCommand.java
b/tools/src/main/java/org/apache/rocketmq/tools/command/message/ConsumeMessageCommand.java
index 518926774..aa98ee628 100644
---
a/tools/src/main/java/org/apache/rocketmq/tools/command/message/ConsumeMessageCommand.java
+++
b/tools/src/main/java/org/apache/rocketmq/tools/command/message/ConsumeMessageCommand.java
@@ -207,6 +207,7 @@ private void pullMessageByQueue(MessageQueue mq, long
minOffset, long maxOffset)
pullResult = defaultMQPullConsumer.pull(mq, "*", offset,
(int)(maxOffset - offset + 1));
} catch (Exception e) {
e.printStackTrace();
+ return;
}
if (pullResult != null) {
offset = pullResult.getNextBeginOffset();
diff --git
a/tools/src/test/java/org/apache/rocketmq/tools/command/message/ConsumeMessageCommandTest.java
b/tools/src/test/java/org/apache/rocketmq/tools/command/message/ConsumeMessageCommandTest.java
index 9a5998e29..11543958c 100644
---
a/tools/src/test/java/org/apache/rocketmq/tools/command/message/ConsumeMessageCommandTest.java
+++
b/tools/src/test/java/org/apache/rocketmq/tools/command/message/ConsumeMessageCommandTest.java
@@ -16,6 +16,13 @@
*/
package org.apache.rocketmq.tools.command.message;
+import java.io.ByteArrayOutputStream;
+import java.io.PrintStream;
+import java.lang.reflect.Field;
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
import org.apache.commons.cli.CommandLine;
import org.apache.commons.cli.Options;
import org.apache.commons.cli.PosixParser;
@@ -34,14 +41,6 @@
import org.junit.BeforeClass;
import org.junit.Test;
-import java.io.ByteArrayOutputStream;
-import java.io.PrintStream;
-import java.lang.reflect.Field;
-import java.util.ArrayList;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Set;
-
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyInt;
import static org.mockito.ArgumentMatchers.anyLong;
@@ -54,14 +53,14 @@
@BeforeClass
public static void init() throws MQClientException, RemotingException,
MQBrokerException, InterruptedException,
- NoSuchFieldException, IllegalAccessException{
+ NoSuchFieldException, IllegalAccessException {
consumeMessageCommand = new ConsumeMessageCommand();
DefaultMQPullConsumer defaultMQPullConsumer =
mock(DefaultMQPullConsumer.class);
MessageExt msg = new MessageExt();
- msg.setBody(new byte[]{'a'});
+ msg.setBody(new byte[] {'a'});
List<MessageExt> msgFoundList = new ArrayList<>();
msgFoundList.add(msg);
- final PullResult pullResult = new PullResult(PullStatus.FOUND,2, 0, 1,
msgFoundList);
+ final PullResult pullResult = new PullResult(PullStatus.FOUND, 2, 0,
1, msgFoundList);
when(defaultMQPullConsumer.pull(any(MessageQueue.class), anyString(),
anyLong(), anyInt())).thenReturn(pullResult);
when(defaultMQPullConsumer.minOffset(any(MessageQueue.class))).thenReturn(Long.valueOf(0));
@@ -73,8 +72,9 @@ public static void init() throws MQClientException,
RemotingException, MQBrokerE
Field producerField =
ConsumeMessageCommand.class.getDeclaredField("defaultMQPullConsumer");
producerField.setAccessible(true);
- producerField.set(consumeMessageCommand,defaultMQPullConsumer);
+ producerField.set(consumeMessageCommand, defaultMQPullConsumer);
}
+
@AfterClass
public static void terminate() {
}
@@ -102,11 +102,55 @@ public void testExecuteByCondition() throws
SubCommandException {
System.setOut(new PrintStream(bos));
Options options = ServerUtil.buildCommandlineOptions(new Options());
- String[] subargs = new String[] {"-t mytopic","-b localhost","-i 0",
"-n localhost:9876"};
+ String[] subargs = new String[] {"-t mytopic", "-b localhost", "-i 0",
"-n localhost:9876"};
CommandLine commandLine = ServerUtil.parseCmdLine("mqadmin " +
consumeMessageCommand.commandName(), subargs,
consumeMessageCommand.buildCommandlineOptions(options), new PosixParser());
consumeMessageCommand.execute(commandLine, options, null);
System.setOut(out);
String s = new String(bos.toByteArray());
Assert.assertTrue(s.contains("Consume ok"));
}
+
+ @Test
+ public void testExecuteDefaultWhenPullMessageByQueueGotException() throws
SubCommandException, InterruptedException, RemotingException,
MQClientException, MQBrokerException, NoSuchFieldException,
IllegalAccessException {
+ DefaultMQPullConsumer defaultMQPullConsumer =
mock(DefaultMQPullConsumer.class);
+ when(defaultMQPullConsumer.pull(any(MessageQueue.class), anyString(),
anyLong(), anyInt())).thenThrow(Exception.class);
+ Field producerField =
ConsumeMessageCommand.class.getDeclaredField("defaultMQPullConsumer");
+ producerField.setAccessible(true);
+ producerField.set(consumeMessageCommand, defaultMQPullConsumer);
+
+ PrintStream out = System.out;
+ ByteArrayOutputStream bos = new ByteArrayOutputStream();
+ System.setOut(new PrintStream(bos));
+ Options options = ServerUtil.buildCommandlineOptions(new Options());
+ String[] subargs = new String[] {"-t topic-not-existu", "-n
localhost:9876"};
+ CommandLine commandLine = ServerUtil.parseCmdLine("mqadmin " +
consumeMessageCommand.commandName(),
+ subargs, consumeMessageCommand.buildCommandlineOptions(options),
new PosixParser());
+ consumeMessageCommand.execute(commandLine, options, null);
+
+ System.setOut(out);
+ String s = new String(bos.toByteArray());
+ Assert.assertTrue(!s.contains("Consume ok"));
+ }
+
+ @Test
+ public void testExecuteByConditionWhenPullMessageByQueueGotException()
throws IllegalAccessException, InterruptedException, RemotingException,
MQClientException, MQBrokerException, NoSuchFieldException, SubCommandException
{
+ DefaultMQPullConsumer defaultMQPullConsumer =
mock(DefaultMQPullConsumer.class);
+ when(defaultMQPullConsumer.pull(any(MessageQueue.class), anyString(),
anyLong(), anyInt())).thenThrow(Exception.class);
+ Field producerField =
ConsumeMessageCommand.class.getDeclaredField("defaultMQPullConsumer");
+ producerField.setAccessible(true);
+ producerField.set(consumeMessageCommand, defaultMQPullConsumer);
+
+ PrintStream out = System.out;
+ ByteArrayOutputStream bos = new ByteArrayOutputStream();
+ System.setOut(new PrintStream(bos));
+ Options options = ServerUtil.buildCommandlineOptions(new Options());
+
+ String[] subargs = new String[] {"-t mytopic", "-b localhost", "-i 0",
"-n localhost:9876"};
+ CommandLine commandLine = ServerUtil.parseCmdLine("mqadmin " +
consumeMessageCommand.commandName(), subargs,
consumeMessageCommand.buildCommandlineOptions(options), new PosixParser());
+ consumeMessageCommand.execute(commandLine, options, null);
+
+ System.setOut(out);
+ String s = new String(bos.toByteArray());
+ Assert.assertTrue(!s.contains("Consume ok"));
+ }
}
\ No newline at end of file
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services