This is an automated email from the ASF dual-hosted git repository.
jinrongtong pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git
The following commit(s) were added to refs/heads/develop by this push:
new f872b682e6 [ISSUE #8695] fix DefaultLitePullConsumer PullThreadNums
Parameter not effective. (#8696)
f872b682e6 is described below
commit f872b682e60651c7700247ec37958a06716d7f00
Author: luozongle01 <[email protected]>
AuthorDate: Sat Sep 14 17:40:37 2024 +0800
[ISSUE #8695] fix DefaultLitePullConsumer PullThreadNums Parameter not
effective. (#8696)
---
.../client/impl/consumer/DefaultLitePullConsumerImpl.java | 13 +++++++++----
1 file changed, 9 insertions(+), 4 deletions(-)
diff --git
a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultLitePullConsumerImpl.java
b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultLitePullConsumerImpl.java
index a3276cd782..3f90b67ec9 100644
---
a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultLitePullConsumerImpl.java
+++
b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultLitePullConsumerImpl.java
@@ -164,10 +164,6 @@ public class DefaultLitePullConsumerImpl implements
MQConsumerInner {
public DefaultLitePullConsumerImpl(final DefaultLitePullConsumer
defaultLitePullConsumer, final RPCHook rpcHook) {
this.defaultLitePullConsumer = defaultLitePullConsumer;
this.rpcHook = rpcHook;
- this.scheduledThreadPoolExecutor = new ScheduledThreadPoolExecutor(
- this.defaultLitePullConsumer.getPullThreadNums(),
- new ThreadFactoryImpl("PullMsgThread-" +
this.defaultLitePullConsumer.getConsumerGroup())
- );
this.scheduledExecutorService =
Executors.newSingleThreadScheduledExecutor(new
ThreadFactoryImpl("MonitorMessageQueueChangeThread"));
this.pullTimeDelayMillsWhenException =
defaultLitePullConsumer.getPullTimeDelayMillsWhenException();
}
@@ -293,6 +289,8 @@ public class DefaultLitePullConsumerImpl implements
MQConsumerInner {
this.defaultLitePullConsumer.changeInstanceNameToPID();
}
+ initScheduledThreadPoolExecutor();
+
initMQClientFactory();
initRebalanceImpl();
@@ -324,6 +322,13 @@ public class DefaultLitePullConsumerImpl implements
MQConsumerInner {
}
}
+ private void initScheduledThreadPoolExecutor() {
+ this.scheduledThreadPoolExecutor = new ScheduledThreadPoolExecutor(
+ this.defaultLitePullConsumer.getPullThreadNums(),
+ new ThreadFactoryImpl("PullMsgThread-" +
this.defaultLitePullConsumer.getConsumerGroup())
+ );
+ }
+
private void initMQClientFactory() throws MQClientException {
this.mQClientFactory =
MQClientManager.getInstance().getOrCreateMQClientInstance(this.defaultLitePullConsumer,
this.rpcHook);
boolean registerOK =
mQClientFactory.registerConsumer(this.defaultLitePullConsumer.getConsumerGroup(),
this);