This is an automated email from the ASF dual-hosted git repository.
vongosling pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git
The following commit(s) were added to refs/heads/develop by this push:
new 3f7dda3 [ISSUE #377] FIX Admin subcommand consumeMessage should quit
when catching an exception (#378)
3f7dda3 is described below
commit 3f7dda3c0c5fb7de74ece7b9ce690a978c58a319
Author: XiaoZYang <[email protected]>
AuthorDate: Mon Jul 23 19:50:51 2018 +0800
[ISSUE #377] FIX Admin subcommand consumeMessage should quit when catching
an exception (#378)
---
.../command/message/ConsumeMessageCommand.java | 1 +
.../command/message/ConsumeMessageCommandTest.java | 70 ++++++++++++++++++----
2 files changed, 58 insertions(+), 13 deletions(-)
diff --git
a/tools/src/main/java/org/apache/rocketmq/tools/command/message/ConsumeMessageCommand.java
b/tools/src/main/java/org/apache/rocketmq/tools/command/message/ConsumeMessageCommand.java
index 5189267..aa98ee6 100644
---
a/tools/src/main/java/org/apache/rocketmq/tools/command/message/ConsumeMessageCommand.java
+++
b/tools/src/main/java/org/apache/rocketmq/tools/command/message/ConsumeMessageCommand.java
@@ -207,6 +207,7 @@ public class ConsumeMessageCommand implements SubCommand {
pullResult = defaultMQPullConsumer.pull(mq, "*", offset,
(int)(maxOffset - offset + 1));
} catch (Exception e) {
e.printStackTrace();
+ return;
}
if (pullResult != null) {
offset = pullResult.getNextBeginOffset();
diff --git
a/tools/src/test/java/org/apache/rocketmq/tools/command/message/ConsumeMessageCommandTest.java
b/tools/src/test/java/org/apache/rocketmq/tools/command/message/ConsumeMessageCommandTest.java
index 9a5998e..1154395 100644
---
a/tools/src/test/java/org/apache/rocketmq/tools/command/message/ConsumeMessageCommandTest.java
+++
b/tools/src/test/java/org/apache/rocketmq/tools/command/message/ConsumeMessageCommandTest.java
@@ -16,6 +16,13 @@
*/
package org.apache.rocketmq.tools.command.message;
+import java.io.ByteArrayOutputStream;
+import java.io.PrintStream;
+import java.lang.reflect.Field;
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
import org.apache.commons.cli.CommandLine;
import org.apache.commons.cli.Options;
import org.apache.commons.cli.PosixParser;
@@ -34,14 +41,6 @@ import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Test;
-import java.io.ByteArrayOutputStream;
-import java.io.PrintStream;
-import java.lang.reflect.Field;
-import java.util.ArrayList;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Set;
-
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyInt;
import static org.mockito.ArgumentMatchers.anyLong;
@@ -54,14 +53,14 @@ public class ConsumeMessageCommandTest {
@BeforeClass
public static void init() throws MQClientException, RemotingException,
MQBrokerException, InterruptedException,
- NoSuchFieldException, IllegalAccessException{
+ NoSuchFieldException, IllegalAccessException {
consumeMessageCommand = new ConsumeMessageCommand();
DefaultMQPullConsumer defaultMQPullConsumer =
mock(DefaultMQPullConsumer.class);
MessageExt msg = new MessageExt();
- msg.setBody(new byte[]{'a'});
+ msg.setBody(new byte[] {'a'});
List<MessageExt> msgFoundList = new ArrayList<>();
msgFoundList.add(msg);
- final PullResult pullResult = new PullResult(PullStatus.FOUND,2, 0, 1,
msgFoundList);
+ final PullResult pullResult = new PullResult(PullStatus.FOUND, 2, 0,
1, msgFoundList);
when(defaultMQPullConsumer.pull(any(MessageQueue.class), anyString(),
anyLong(), anyInt())).thenReturn(pullResult);
when(defaultMQPullConsumer.minOffset(any(MessageQueue.class))).thenReturn(Long.valueOf(0));
@@ -73,8 +72,9 @@ public class ConsumeMessageCommandTest {
Field producerField =
ConsumeMessageCommand.class.getDeclaredField("defaultMQPullConsumer");
producerField.setAccessible(true);
- producerField.set(consumeMessageCommand,defaultMQPullConsumer);
+ producerField.set(consumeMessageCommand, defaultMQPullConsumer);
}
+
@AfterClass
public static void terminate() {
}
@@ -102,11 +102,55 @@ public class ConsumeMessageCommandTest {
System.setOut(new PrintStream(bos));
Options options = ServerUtil.buildCommandlineOptions(new Options());
- String[] subargs = new String[] {"-t mytopic","-b localhost","-i 0",
"-n localhost:9876"};
+ String[] subargs = new String[] {"-t mytopic", "-b localhost", "-i 0",
"-n localhost:9876"};
CommandLine commandLine = ServerUtil.parseCmdLine("mqadmin " +
consumeMessageCommand.commandName(), subargs,
consumeMessageCommand.buildCommandlineOptions(options), new PosixParser());
consumeMessageCommand.execute(commandLine, options, null);
System.setOut(out);
String s = new String(bos.toByteArray());
Assert.assertTrue(s.contains("Consume ok"));
}
+
+ @Test
+ public void testExecuteDefaultWhenPullMessageByQueueGotException() throws
SubCommandException, InterruptedException, RemotingException,
MQClientException, MQBrokerException, NoSuchFieldException,
IllegalAccessException {
+ DefaultMQPullConsumer defaultMQPullConsumer =
mock(DefaultMQPullConsumer.class);
+ when(defaultMQPullConsumer.pull(any(MessageQueue.class), anyString(),
anyLong(), anyInt())).thenThrow(Exception.class);
+ Field producerField =
ConsumeMessageCommand.class.getDeclaredField("defaultMQPullConsumer");
+ producerField.setAccessible(true);
+ producerField.set(consumeMessageCommand, defaultMQPullConsumer);
+
+ PrintStream out = System.out;
+ ByteArrayOutputStream bos = new ByteArrayOutputStream();
+ System.setOut(new PrintStream(bos));
+ Options options = ServerUtil.buildCommandlineOptions(new Options());
+ String[] subargs = new String[] {"-t topic-not-existu", "-n
localhost:9876"};
+ CommandLine commandLine = ServerUtil.parseCmdLine("mqadmin " +
consumeMessageCommand.commandName(),
+ subargs, consumeMessageCommand.buildCommandlineOptions(options),
new PosixParser());
+ consumeMessageCommand.execute(commandLine, options, null);
+
+ System.setOut(out);
+ String s = new String(bos.toByteArray());
+ Assert.assertTrue(!s.contains("Consume ok"));
+ }
+
+ @Test
+ public void testExecuteByConditionWhenPullMessageByQueueGotException()
throws IllegalAccessException, InterruptedException, RemotingException,
MQClientException, MQBrokerException, NoSuchFieldException, SubCommandException
{
+ DefaultMQPullConsumer defaultMQPullConsumer =
mock(DefaultMQPullConsumer.class);
+ when(defaultMQPullConsumer.pull(any(MessageQueue.class), anyString(),
anyLong(), anyInt())).thenThrow(Exception.class);
+ Field producerField =
ConsumeMessageCommand.class.getDeclaredField("defaultMQPullConsumer");
+ producerField.setAccessible(true);
+ producerField.set(consumeMessageCommand, defaultMQPullConsumer);
+
+ PrintStream out = System.out;
+ ByteArrayOutputStream bos = new ByteArrayOutputStream();
+ System.setOut(new PrintStream(bos));
+ Options options = ServerUtil.buildCommandlineOptions(new Options());
+
+ String[] subargs = new String[] {"-t mytopic", "-b localhost", "-i 0",
"-n localhost:9876"};
+ CommandLine commandLine = ServerUtil.parseCmdLine("mqadmin " +
consumeMessageCommand.commandName(), subargs,
consumeMessageCommand.buildCommandlineOptions(options), new PosixParser());
+ consumeMessageCommand.execute(commandLine, options, null);
+
+ System.setOut(out);
+ String s = new String(bos.toByteArray());
+ Assert.assertTrue(!s.contains("Consume ok"));
+ }
}
\ No newline at end of file