vongosling closed pull request #209: [ROCKETMQ-339] Specify consume thread pool 
for different topic
URL: https://github.com/apache/rocketmq/pull/209
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultMQPushConsumer.java
 
b/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultMQPushConsumer.java
index d51030a15..bcad39ab4 100644
--- 
a/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultMQPushConsumer.java
+++ 
b/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultMQPushConsumer.java
@@ -19,6 +19,8 @@
 import java.util.HashMap;
 import java.util.Map;
 import java.util.Set;
+import java.util.concurrent.ThreadPoolExecutor;
+
 import org.apache.rocketmq.client.ClientConfig;
 import org.apache.rocketmq.client.QueryResult;
 import org.apache.rocketmq.client.consumer.listener.MessageListener;
@@ -694,4 +696,9 @@ public long getConsumeTimeout() {
     public void setConsumeTimeout(final long consumeTimeout) {
         this.consumeTimeout = consumeTimeout;
     }
+
+    @Override
+    public void subscribe(String topic, String subExpression, 
ThreadPoolExecutor consumeExecutor) throws MQClientException {
+        this.defaultMQPushConsumerImpl.subscribe(topic, subExpression, 
consumeExecutor);
+    }
 }
diff --git 
a/client/src/main/java/org/apache/rocketmq/client/consumer/MQPushConsumer.java 
b/client/src/main/java/org/apache/rocketmq/client/consumer/MQPushConsumer.java
index d56075c60..9b0401e64 100644
--- 
a/client/src/main/java/org/apache/rocketmq/client/consumer/MQPushConsumer.java
+++ 
b/client/src/main/java/org/apache/rocketmq/client/consumer/MQPushConsumer.java
@@ -21,6 +21,8 @@
 import org.apache.rocketmq.client.consumer.listener.MessageListenerOrderly;
 import org.apache.rocketmq.client.exception.MQClientException;
 
+import java.util.concurrent.ThreadPoolExecutor;
+
 /**
  * Push consumer
  */
@@ -54,6 +56,14 @@
      */
     void subscribe(final String topic, final String subExpression) throws 
MQClientException;
 
+    /**
+     *  Subscribe some topic
+     * @param subExpression subscription expression.it only support or 
operation such as "tag1 || tag2 || tag3" <br> if
+     * null or * expression,meaning subscribe all
+     * @param consumeExecutor consume listener callback threadexecutor
+     */
+    void subscribe(final String topic, final String subExpression, 
ThreadPoolExecutor consumeExecutor) throws MQClientException;
+
     /**
      * Subscribe some topic
      *
diff --git 
a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/ConsumeMessageConcurrentlyService.java
 
b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/ConsumeMessageConcurrentlyService.java
index 871201783..387ec24ae 100644
--- 
a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/ConsumeMessageConcurrentlyService.java
+++ 
b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/ConsumeMessageConcurrentlyService.java
@@ -22,6 +22,7 @@
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.Executors;
 import java.util.concurrent.LinkedBlockingQueue;
@@ -60,6 +61,8 @@
     private final ScheduledExecutorService scheduledExecutorService;
     private final ScheduledExecutorService cleanExpireMsgExecutors;
 
+    private final Map<String, ThreadPoolExecutor> consumeThreadExecutorMap = 
new HashMap<String, ThreadPoolExecutor>();
+
     public ConsumeMessageConcurrentlyService(DefaultMQPushConsumerImpl 
defaultMQPushConsumerImpl,
         MessageListenerConcurrently messageListener) {
         this.defaultMQPushConsumerImpl = defaultMQPushConsumerImpl;
@@ -95,9 +98,28 @@ public void run() {
     public void shutdown() {
         this.scheduledExecutorService.shutdown();
         this.consumeExecutor.shutdown();
+        Set<Map.Entry<String, ThreadPoolExecutor>> topicConsumeExecutors = 
consumeThreadExecutorMap.entrySet();
+        for (Map.Entry<String, ThreadPoolExecutor> topicConsumeExecutor : 
topicConsumeExecutors) {
+            topicConsumeExecutor.getValue().shutdown();
+        }
         this.cleanExpireMsgExecutors.shutdown();
     }
 
+    @Override
+    public void setConsumeThreadPoolExecutor(String topic, ThreadPoolExecutor 
threadPoolExecutor) {
+        consumeThreadExecutorMap.put(topic, threadPoolExecutor);
+    }
+
+    private ThreadPoolExecutor getConsumeExecutor(MessageQueue messageQueue) {
+        if (consumeThreadExecutorMap != null && messageQueue != null) {
+            ThreadPoolExecutor threadPoolExecutor = 
consumeThreadExecutorMap.get(messageQueue.getTopic());
+            if (threadPoolExecutor != null) {
+                return threadPoolExecutor;
+            }
+        }
+        return consumeExecutor;
+    }
+
     @Override
     public void updateCorePoolSize(int corePoolSize) {
         if (corePoolSize > 0
@@ -207,7 +229,7 @@ public void submitConsumeRequest(
         if (msgs.size() <= consumeBatchSize) {
             ConsumeRequest consumeRequest = new ConsumeRequest(msgs, 
processQueue, messageQueue);
             try {
-                this.consumeExecutor.submit(consumeRequest);
+                this.getConsumeExecutor(messageQueue).submit(consumeRequest);
             } catch (RejectedExecutionException e) {
                 this.submitConsumeRequestLater(consumeRequest);
             }
@@ -224,7 +246,7 @@ public void submitConsumeRequest(
 
                 ConsumeRequest consumeRequest = new ConsumeRequest(msgThis, 
processQueue, messageQueue);
                 try {
-                    this.consumeExecutor.submit(consumeRequest);
+                    
this.getConsumeExecutor(messageQueue).submit(consumeRequest);
                 } catch (RejectedExecutionException e) {
                     for (; total < msgs.size(); total++) {
                         msgThis.add(msgs.get(total));
@@ -358,7 +380,7 @@ private void submitConsumeRequestLater(final ConsumeRequest 
consumeRequest
 
             @Override
             public void run() {
-                
ConsumeMessageConcurrentlyService.this.consumeExecutor.submit(consumeRequest);
+                
ConsumeMessageConcurrentlyService.this.getConsumeExecutor(consumeRequest.getMessageQueue()).submit(consumeRequest);
             }
         }, 5000, TimeUnit.MILLISECONDS);
     }
diff --git 
a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/ConsumeMessageOrderlyService.java
 
b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/ConsumeMessageOrderlyService.java
index 43199e5b2..0174a0a24 100644
--- 
a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/ConsumeMessageOrderlyService.java
+++ 
b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/ConsumeMessageOrderlyService.java
@@ -20,6 +20,8 @@
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
+import java.util.Set;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.Executors;
 import java.util.concurrent.LinkedBlockingQueue;
@@ -62,6 +64,8 @@
     private final ScheduledExecutorService scheduledExecutorService;
     private volatile boolean stopped = false;
 
+    private final Map<String, ThreadPoolExecutor> consumeThreadExecutorMap = 
new HashMap<String, ThreadPoolExecutor>();
+
     public ConsumeMessageOrderlyService(DefaultMQPushConsumerImpl 
defaultMQPushConsumerImpl,
         MessageListenerOrderly messageListener) {
         this.defaultMQPushConsumerImpl = defaultMQPushConsumerImpl;
@@ -97,11 +101,30 @@ public void shutdown() {
         this.stopped = true;
         this.scheduledExecutorService.shutdown();
         this.consumeExecutor.shutdown();
+        Set<Map.Entry<String, ThreadPoolExecutor>> topicConsumeExecutors = 
consumeThreadExecutorMap.entrySet();
+        for (Map.Entry<String, ThreadPoolExecutor> topicConsumeExecutor : 
topicConsumeExecutors) {
+            topicConsumeExecutor.getValue().shutdown();
+        }
         if 
(MessageModel.CLUSTERING.equals(this.defaultMQPushConsumerImpl.messageModel())) 
{
             this.unlockAllMQ();
         }
     }
 
+    @Override
+    public void setConsumeThreadPoolExecutor(String topic, ThreadPoolExecutor 
threadPoolExecutor) {
+        consumeThreadExecutorMap.put(topic, threadPoolExecutor);
+    }
+
+    private ThreadPoolExecutor getConsumeExecutor(MessageQueue messageQueue) {
+        if (consumeThreadExecutorMap != null && messageQueue != null) {
+            ThreadPoolExecutor threadPoolExecutor = 
consumeThreadExecutorMap.get(messageQueue.getTopic());
+            if (threadPoolExecutor != null) {
+                return threadPoolExecutor;
+            }
+        }
+        return consumeExecutor;
+    }
+
     public synchronized void unlockAllMQ() {
         this.defaultMQPushConsumerImpl.getRebalanceImpl().unlockAll(false);
     }
@@ -195,7 +218,7 @@ public void submitConsumeRequest(
         final boolean dispathToConsume) {
         if (dispathToConsume) {
             ConsumeRequest consumeRequest = new ConsumeRequest(processQueue, 
messageQueue);
-            this.consumeExecutor.submit(consumeRequest);
+            this.getConsumeExecutor(messageQueue).submit(consumeRequest);
         }
     }
 
diff --git 
a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/ConsumeMessageService.java
 
b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/ConsumeMessageService.java
index 0f6f3bb38..00dca943f 100644
--- 
a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/ConsumeMessageService.java
+++ 
b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/ConsumeMessageService.java
@@ -17,6 +17,8 @@
 package org.apache.rocketmq.client.impl.consumer;
 
 import java.util.List;
+import java.util.concurrent.ThreadPoolExecutor;
+
 import org.apache.rocketmq.common.message.MessageExt;
 import org.apache.rocketmq.common.message.MessageQueue;
 import org.apache.rocketmq.common.protocol.body.ConsumeMessageDirectlyResult;
@@ -41,4 +43,6 @@ void submitConsumeRequest(
         final ProcessQueue processQueue,
         final MessageQueue messageQueue,
         final boolean dispathToConsume);
+
+    void setConsumeThreadPoolExecutor(String topic,ThreadPoolExecutor 
threadPoolExecutor);
 }
diff --git 
a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPushConsumerImpl.java
 
b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPushConsumerImpl.java
index f560376c6..9543641d9 100644
--- 
a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPushConsumerImpl.java
+++ 
b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPushConsumerImpl.java
@@ -27,6 +27,8 @@
 import java.util.Properties;
 import java.util.Set;
 import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.ThreadPoolExecutor;
+
 import org.apache.rocketmq.client.QueryResult;
 import org.apache.rocketmq.client.Validators;
 import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
@@ -109,6 +111,8 @@
     private long queueFlowControlTimes = 0;
     private long queueMaxSpanFlowControlTimes = 0;
 
+    private final Map<String, ThreadPoolExecutor> topicConsumeExecutorMap = 
new HashMap<String, ThreadPoolExecutor>();
+
     public DefaultMQPushConsumerImpl(DefaultMQPushConsumer 
defaultMQPushConsumer, RPCHook rpcHook) {
         this.defaultMQPushConsumer = defaultMQPushConsumer;
         this.rpcHook = rpcHook;
@@ -603,6 +607,11 @@ public synchronized void start() throws MQClientException {
                         new ConsumeMessageConcurrentlyService(this, 
(MessageListenerConcurrently) this.getMessageListenerInner());
                 }
 
+                Set<Entry<String, ThreadPoolExecutor>> topicConsumeExecutors = 
this.topicConsumeExecutorMap.entrySet();
+                for (Entry<String, ThreadPoolExecutor> topicConsumeExecutor : 
topicConsumeExecutors) {
+                    
this.consumeMessageService.setConsumeThreadPoolExecutor(topicConsumeExecutor.getKey(),topicConsumeExecutor.getValue());
+                }
+
                 this.consumeMessageService.start();
 
                 boolean registerOK = 
mQClientFactory.registerConsumer(this.defaultMQPushConsumer.getConsumerGroup(), 
this);
@@ -870,6 +879,11 @@ public void subscribe(String topic, String subExpression) 
throws MQClientExcepti
         }
     }
 
+    public void subscribe(String topic, String subExpression, 
ThreadPoolExecutor consumeExecutor) throws MQClientException {
+        this.subscribe(topic,subExpression);
+        this.topicConsumeExecutorMap.put(topic,consumeExecutor);
+    }
+
     public void subscribe(String topic, String fullClassName, String 
filterClassSource) throws MQClientException {
         try {
             SubscriptionData subscriptionData = 
FilterAPI.buildSubscriptionData(this.defaultMQPushConsumer.getConsumerGroup(),
@@ -1137,6 +1151,5 @@ public ConsumeMessageService getConsumeMessageService() {
 
     public void setConsumeMessageService(ConsumeMessageService 
consumeMessageService) {
         this.consumeMessageService = consumeMessageService;
-
     }
 }
diff --git 
a/client/src/test/java/org/apache/rocketmq/client/consumer/DefaultMQPushConsumerTest.java
 
b/client/src/test/java/org/apache/rocketmq/client/consumer/DefaultMQPushConsumerTest.java
index b21edc919..207fe41ff 100644
--- 
a/client/src/test/java/org/apache/rocketmq/client/consumer/DefaultMQPushConsumerTest.java
+++ 
b/client/src/test/java/org/apache/rocketmq/client/consumer/DefaultMQPushConsumerTest.java
@@ -18,12 +18,16 @@
 
 import java.io.ByteArrayOutputStream;
 import java.lang.reflect.Field;
+import java.lang.reflect.InvocationTargetException;
+import java.lang.reflect.Method;
 import java.net.InetSocketAddress;
 import java.util.Collections;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Set;
 import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.LinkedBlockingDeque;
+import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
 import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
 import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
@@ -38,6 +42,7 @@
 import org.apache.rocketmq.client.impl.MQClientAPIImpl;
 import 
org.apache.rocketmq.client.impl.consumer.ConsumeMessageConcurrentlyService;
 import org.apache.rocketmq.client.impl.consumer.ConsumeMessageOrderlyService;
+import org.apache.rocketmq.client.impl.consumer.ConsumeMessageService;
 import org.apache.rocketmq.client.impl.consumer.DefaultMQPushConsumerImpl;
 import org.apache.rocketmq.client.impl.consumer.ProcessQueue;
 import org.apache.rocketmq.client.impl.consumer.PullAPIWrapper;
@@ -46,6 +51,7 @@
 import org.apache.rocketmq.client.impl.consumer.PullResultExt;
 import org.apache.rocketmq.client.impl.consumer.RebalancePushImpl;
 import org.apache.rocketmq.client.impl.factory.MQClientInstance;
+import org.apache.rocketmq.common.ThreadFactoryImpl;
 import org.apache.rocketmq.common.message.MessageClientExt;
 import org.apache.rocketmq.common.message.MessageDecoder;
 import org.apache.rocketmq.common.message.MessageExt;
@@ -85,6 +91,12 @@
     private RebalancePushImpl rebalancePushImpl;
     private DefaultMQPushConsumer pushConsumer;
 
+    private boolean listenerCalled = false;
+
+    private boolean isConsumeThreadOk = false;
+
+    private ThreadPoolExecutor consumeExecutor = null;
+
     @Before
     public void init() throws Exception {
         consumerGroup = "FooBarGroup" + System.currentTimeMillis();
@@ -96,6 +108,14 @@ public void init() throws Exception {
             @Override
             public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> 
msgs,
                 ConsumeConcurrentlyContext context) {
+                for (MessageExt msg : msgs) {
+                    if(msg.getTopic().equals(topic)){
+                        
if(Thread.currentThread().getName().startsWith("MyConsumeThread_")){
+                            isConsumeThreadOk = true;
+                        }
+                    }
+                }
+                listenerCalled = true;
                 return null;
             }
         });
@@ -105,7 +125,11 @@ public ConsumeConcurrentlyStatus 
consumeMessage(List<MessageExt> msgs,
         Field field = 
DefaultMQPushConsumerImpl.class.getDeclaredField("rebalanceImpl");
         field.setAccessible(true);
         field.set(pushConsumerImpl, rebalancePushImpl);
-        pushConsumer.subscribe(topic, "*");
+
+        consumeExecutor = new ThreadPoolExecutor(5, 5, 1000 * 60, 
TimeUnit.MILLISECONDS,
+            new LinkedBlockingDeque<Runnable>(), new 
ThreadFactoryImpl("MyConsumeThread_"));
+        pushConsumer.subscribe(topic, "*", consumeExecutor);
+
         pushConsumer.start();
 
         mQClientFactory = spy(pushConsumerImpl.getmQClientFactory());
@@ -250,6 +274,27 @@ public void testCheckConfig() {
         }
     }
 
+    @Test
+    public void testConsumeThread() throws NoSuchMethodException, 
InvocationTargetException, IllegalAccessException {
+        if(listenerCalled&&isConsumeThreadOk){
+            return;
+        }else{
+            ConsumeMessageService consumeMessageService = 
pushConsumer.getDefaultMQPushConsumerImpl().getConsumeMessageService();
+            Method getConsumeExecutorMethod = null;
+            if(consumeMessageService instanceof 
ConsumeMessageConcurrentlyService){
+                ConsumeMessageConcurrentlyService cs = 
(ConsumeMessageConcurrentlyService)consumeMessageService;
+                getConsumeExecutorMethod = 
ConsumeMessageConcurrentlyService.class.getDeclaredMethod("getConsumeExecutor",MessageQueue.class);
+            }else {
+                getConsumeExecutorMethod = 
ConsumeMessageOrderlyService.class.getDeclaredMethod("getConsumeExecutor",MessageQueue.class);
+            }
+            getConsumeExecutorMethod.setAccessible(true);
+            MessageQueue messageQueue = new MessageQueue();
+            messageQueue.setTopic(topic);
+            ThreadPoolExecutor executor = 
(ThreadPoolExecutor)getConsumeExecutorMethod.invoke(consumeMessageService, 
messageQueue);
+            assertThat(executor).isEqualTo(consumeExecutor);
+        }
+    }
+
     private DefaultMQPushConsumer createPushConsumer() {
         DefaultMQPushConsumer pushConsumer = new 
DefaultMQPushConsumer(consumerGroup);
         pushConsumer.registerMessageListener(new MessageListenerConcurrently() 
{


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

Reply via email to