This is an automated email from the ASF dual-hosted git repository.
vongosling pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git
The following commit(s) were added to refs/heads/develop by this push:
new 947526b [ROCKETMQ-266] Add a specific Exception message for comparing
consumerThreadMax and consumerThreadMin (#147)
947526b is described below
commit 947526b7ce021b5cb9510a8197590ee33a90b938
Author: Mark_Yang <[email protected]>
AuthorDate: Wed Dec 13 05:35:52 2017 -0600
[ROCKETMQ-266] Add a specific Exception message for comparing
consumerThreadMax and consumerThreadMin (#147)
* Can’t start consumer with a small consumerThreadMax number
* Can’t start consumer with a small consumerThreadMax number
* test case for ROCKETMQ-266
* Can’t start consumer with a small consumerThreadMax number
* Can’t start consumer with a small consumerThreadMax number
* fix merge conflict
* update test case for ROCKETMQ-266
* for ROCKETMQ-266
* Trigger rebuild
* Trigger rebuild
* Trigger rebuild
---
.../impl/consumer/DefaultMQPushConsumerImpl.java | 11 +++-
.../consumer/DefaultMQPushConsumerImplTest.java | 61 ++++++++++++++++++++++
2 files changed, 70 insertions(+), 2 deletions(-)
diff --git
a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPushConsumerImpl.java
b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPushConsumerImpl.java
index 72bc953..f560376 100644
---
a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPushConsumerImpl.java
+++
b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPushConsumerImpl.java
@@ -711,8 +711,7 @@ public class DefaultMQPushConsumerImpl implements
MQConsumerInner {
// consumeThreadMin
if (this.defaultMQPushConsumer.getConsumeThreadMin() < 1
- || this.defaultMQPushConsumer.getConsumeThreadMin() > 1000
- || this.defaultMQPushConsumer.getConsumeThreadMin() >
this.defaultMQPushConsumer.getConsumeThreadMax()) {
+ || this.defaultMQPushConsumer.getConsumeThreadMin() > 1000) {
throw new MQClientException(
"consumeThreadMin Out of range [1, 1000]"
+ FAQUrl.suggestTodo(FAQUrl.CLIENT_PARAMETER_CHECK_URL),
@@ -727,6 +726,14 @@ public class DefaultMQPushConsumerImpl implements
MQConsumerInner {
null);
}
+ // consumeThreadMin can't be larger than consumeThreadMax
+ if (this.defaultMQPushConsumer.getConsumeThreadMin() >
this.defaultMQPushConsumer.getConsumeThreadMax()) {
+ throw new MQClientException(
+ "consumeThreadMin (" +
this.defaultMQPushConsumer.getConsumeThreadMin() + ") "
+ + "is larger than consumeThreadMax (" +
this.defaultMQPushConsumer.getConsumeThreadMax() + ")",
+ null);
+ }
+
// consumeConcurrentlyMaxSpan
if (this.defaultMQPushConsumer.getConsumeConcurrentlyMaxSpan() < 1
|| this.defaultMQPushConsumer.getConsumeConcurrentlyMaxSpan() >
65535) {
diff --git
a/client/src/test/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPushConsumerImplTest.java
b/client/src/test/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPushConsumerImplTest.java
new file mode 100644
index 0000000..d4f5812
--- /dev/null
+++
b/client/src/test/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPushConsumerImplTest.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.client.impl.consumer;
+
+import java.util.List;
+import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
+import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
+import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
+import
org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
+import org.apache.rocketmq.client.exception.MQClientException;
+import org.apache.rocketmq.common.message.MessageExt;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+
+public class DefaultMQPushConsumerImplTest {
+
+ @Rule
+ public ExpectedException thrown = ExpectedException.none();
+
+ @Test
+ public void checkConfigTest() throws MQClientException {
+
+ //test type
+ thrown.expect(MQClientException.class);
+
+ //test message
+ thrown.expectMessage("consumeThreadMin (10) is larger than
consumeThreadMax (9)");
+
+ DefaultMQPushConsumer consumer = new
DefaultMQPushConsumer("test_consumer_group");
+
+ consumer.setConsumeThreadMin(10);
+ consumer.setConsumeThreadMax(9);
+
+ consumer.registerMessageListener(new MessageListenerConcurrently() {
+ public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt>
msgs,
+ ConsumeConcurrentlyContext context) {
+ System.out.println(" Receive New Messages: " + msgs);
+ return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
+ }
+ });
+
+ DefaultMQPushConsumerImpl defaultMQPushConsumerImpl = new
DefaultMQPushConsumerImpl(consumer, null);
+ defaultMQPushConsumerImpl.start();
+ }
+}
--
To stop receiving notification emails like this one, please contact
['"[email protected]" <[email protected]>'].