This is an automated email from the ASF dual-hosted git repository.

vongosling pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git


The following commit(s) were added to refs/heads/develop by this push:
     new 947526b  [ROCKETMQ-266] Add a specific Exception message for comparing 
consumerThreadMax and consumerThreadMin (#147)
947526b is described below

commit 947526b7ce021b5cb9510a8197590ee33a90b938
Author: Mark_Yang <[email protected]>
AuthorDate: Wed Dec 13 05:35:52 2017 -0600

    [ROCKETMQ-266] Add a specific Exception message for comparing 
consumerThreadMax and consumerThreadMin (#147)
    
    * Can’t start consumer with a small consumerThreadMax number
    
    * Can’t start consumer with a small consumerThreadMax number
    
    * test case for ROCKETMQ-266
    
    * Can’t start consumer with a small consumerThreadMax number
    
    * Can’t start consumer with a small consumerThreadMax number
    
    * fix merge conflict
    
    * update test case for ROCKETMQ-266
    
    * for ROCKETMQ-266
    
    * Trigger rebuild
    
    * Trigger rebuild
    
    * Trigger rebuild
---
 .../impl/consumer/DefaultMQPushConsumerImpl.java   | 11 +++-
 .../consumer/DefaultMQPushConsumerImplTest.java    | 61 ++++++++++++++++++++++
 2 files changed, 70 insertions(+), 2 deletions(-)

diff --git 
a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPushConsumerImpl.java
 
b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPushConsumerImpl.java
index 72bc953..f560376 100644
--- 
a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPushConsumerImpl.java
+++ 
b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPushConsumerImpl.java
@@ -711,8 +711,7 @@ public class DefaultMQPushConsumerImpl implements 
MQConsumerInner {
 
         // consumeThreadMin
         if (this.defaultMQPushConsumer.getConsumeThreadMin() < 1
-            || this.defaultMQPushConsumer.getConsumeThreadMin() > 1000
-            || this.defaultMQPushConsumer.getConsumeThreadMin() > 
this.defaultMQPushConsumer.getConsumeThreadMax()) {
+            || this.defaultMQPushConsumer.getConsumeThreadMin() > 1000) {
             throw new MQClientException(
                 "consumeThreadMin Out of range [1, 1000]"
                     + FAQUrl.suggestTodo(FAQUrl.CLIENT_PARAMETER_CHECK_URL),
@@ -727,6 +726,14 @@ public class DefaultMQPushConsumerImpl implements 
MQConsumerInner {
                 null);
         }
 
+        // consumeThreadMin can't be larger than consumeThreadMax
+        if (this.defaultMQPushConsumer.getConsumeThreadMin() > 
this.defaultMQPushConsumer.getConsumeThreadMax()) {
+            throw new MQClientException(
+                "consumeThreadMin (" + 
this.defaultMQPushConsumer.getConsumeThreadMin() + ") "
+                    + "is larger than consumeThreadMax (" + 
this.defaultMQPushConsumer.getConsumeThreadMax() + ")",
+                null);
+        }
+
         // consumeConcurrentlyMaxSpan
         if (this.defaultMQPushConsumer.getConsumeConcurrentlyMaxSpan() < 1
             || this.defaultMQPushConsumer.getConsumeConcurrentlyMaxSpan() > 
65535) {
diff --git 
a/client/src/test/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPushConsumerImplTest.java
 
b/client/src/test/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPushConsumerImplTest.java
new file mode 100644
index 0000000..d4f5812
--- /dev/null
+++ 
b/client/src/test/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPushConsumerImplTest.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.client.impl.consumer;
+
+import java.util.List;
+import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
+import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
+import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
+import 
org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
+import org.apache.rocketmq.client.exception.MQClientException;
+import org.apache.rocketmq.common.message.MessageExt;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+
+public class DefaultMQPushConsumerImplTest {
+
+    @Rule
+    public ExpectedException thrown = ExpectedException.none();
+
+    @Test
+    public void checkConfigTest() throws MQClientException {
+
+        //test type
+        thrown.expect(MQClientException.class);
+
+        //test message
+        thrown.expectMessage("consumeThreadMin (10) is larger than 
consumeThreadMax (9)");
+
+        DefaultMQPushConsumer consumer = new 
DefaultMQPushConsumer("test_consumer_group");
+
+        consumer.setConsumeThreadMin(10);
+        consumer.setConsumeThreadMax(9);
+
+        consumer.registerMessageListener(new MessageListenerConcurrently() {
+            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> 
msgs,
+                ConsumeConcurrentlyContext context) {
+                System.out.println(" Receive New Messages: " + msgs);
+                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
+            }
+        });
+
+        DefaultMQPushConsumerImpl defaultMQPushConsumerImpl = new 
DefaultMQPushConsumerImpl(consumer, null);
+        defaultMQPushConsumerImpl.start();
+    }
+}

-- 
To stop receiving notification emails like this one, please contact
['"[email protected]" <[email protected]>'].

Reply via email to