This is an automated email from the ASF dual-hosted git repository.

jinrongtong pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git


The following commit(s) were added to refs/heads/develop by this push:
     new f872b682e6 [ISSUE #8695] fix DefaultLitePullConsumer PullThreadNums 
Parameter not effective. (#8696)
f872b682e6 is described below

commit f872b682e60651c7700247ec37958a06716d7f00
Author: luozongle01 <[email protected]>
AuthorDate: Sat Sep 14 17:40:37 2024 +0800

    [ISSUE #8695] fix DefaultLitePullConsumer PullThreadNums Parameter not 
effective. (#8696)
---
 .../client/impl/consumer/DefaultLitePullConsumerImpl.java   | 13 +++++++++----
 1 file changed, 9 insertions(+), 4 deletions(-)

diff --git 
a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultLitePullConsumerImpl.java
 
b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultLitePullConsumerImpl.java
index a3276cd782..3f90b67ec9 100644
--- 
a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultLitePullConsumerImpl.java
+++ 
b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultLitePullConsumerImpl.java
@@ -164,10 +164,6 @@ public class DefaultLitePullConsumerImpl implements 
MQConsumerInner {
     public DefaultLitePullConsumerImpl(final DefaultLitePullConsumer 
defaultLitePullConsumer, final RPCHook rpcHook) {
         this.defaultLitePullConsumer = defaultLitePullConsumer;
         this.rpcHook = rpcHook;
-        this.scheduledThreadPoolExecutor = new ScheduledThreadPoolExecutor(
-            this.defaultLitePullConsumer.getPullThreadNums(),
-            new ThreadFactoryImpl("PullMsgThread-" + 
this.defaultLitePullConsumer.getConsumerGroup())
-        );
         this.scheduledExecutorService = 
Executors.newSingleThreadScheduledExecutor(new 
ThreadFactoryImpl("MonitorMessageQueueChangeThread"));
         this.pullTimeDelayMillsWhenException = 
defaultLitePullConsumer.getPullTimeDelayMillsWhenException();
     }
@@ -293,6 +289,8 @@ public class DefaultLitePullConsumerImpl implements 
MQConsumerInner {
                     this.defaultLitePullConsumer.changeInstanceNameToPID();
                 }
 
+                initScheduledThreadPoolExecutor();
+
                 initMQClientFactory();
 
                 initRebalanceImpl();
@@ -324,6 +322,13 @@ public class DefaultLitePullConsumerImpl implements 
MQConsumerInner {
         }
     }
 
+    private void initScheduledThreadPoolExecutor() {
+        this.scheduledThreadPoolExecutor = new ScheduledThreadPoolExecutor(
+                this.defaultLitePullConsumer.getPullThreadNums(),
+                new ThreadFactoryImpl("PullMsgThread-" + 
this.defaultLitePullConsumer.getConsumerGroup())
+        );
+    }
+
     private void initMQClientFactory() throws MQClientException {
         this.mQClientFactory = 
MQClientManager.getInstance().getOrCreateMQClientInstance(this.defaultLitePullConsumer,
 this.rpcHook);
         boolean registerOK = 
mQClientFactory.registerConsumer(this.defaultLitePullConsumer.getConsumerGroup(),
 this);

Reply via email to