This is an automated email from the ASF dual-hosted git repository.
lizhimin pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git
The following commit(s) were added to refs/heads/develop by this push:
new 9202de34c3 [ISSUE #8933] feat: DefaultPullConsumer add balance switch
(#8934)
9202de34c3 is described below
commit 9202de34c30db004db26f4976f165595af1b8bd3
Author: Humkum <[email protected]>
AuthorDate: Wed Nov 20 14:58:30 2024 +0800
[ISSUE #8933] feat: DefaultPullConsumer add balance switch (#8934)
---
.../apache/rocketmq/client/consumer/DefaultMQPullConsumer.java | 10 ++++++++++
.../client/impl/consumer/DefaultMQPullConsumerImpl.java | 7 +++++++
2 files changed, 17 insertions(+)
diff --git
a/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultMQPullConsumer.java
b/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultMQPullConsumer.java
index 7c9a65ecdb..9e7a86d9b4 100644
---
a/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultMQPullConsumer.java
+++
b/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultMQPullConsumer.java
@@ -88,6 +88,8 @@ public class DefaultMQPullConsumer extends ClientConfig
implements MQPullConsume
private int maxReconsumeTimes = 16;
+ private boolean enableRebalance = true;
+
public DefaultMQPullConsumer() {
this(MixAll.DEFAULT_CONSUMER_GROUP, null);
}
@@ -468,4 +470,12 @@ public class DefaultMQPullConsumer extends ClientConfig
implements MQPullConsume
public void persist(MessageQueue mq) {
this.getOffsetStore().persist(queueWithNamespace(mq));
}
+
+ public boolean isEnableRebalance() {
+ return enableRebalance;
+ }
+
+ public void setEnableRebalance(boolean enableRebalance) {
+ this.enableRebalance = enableRebalance;
+ }
}
diff --git
a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPullConsumerImpl.java
b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPullConsumerImpl.java
index 9a8ea8fb4f..e05c614c6d 100644
---
a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPullConsumerImpl.java
+++
b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPullConsumerImpl.java
@@ -381,6 +381,9 @@ public class DefaultMQPullConsumerImpl implements
MQConsumerInner {
@Override
public void doRebalance() {
+ if (!defaultMQPullConsumer.isEnableRebalance()) {
+ return;
+ }
if (this.rebalanceImpl != null) {
this.rebalanceImpl.doRebalance(false);
}
@@ -388,6 +391,10 @@ public class DefaultMQPullConsumerImpl implements
MQConsumerInner {
@Override
public boolean tryRebalance() {
+ if (!defaultMQPullConsumer.isEnableRebalance()) {
+ return true;
+ }
+
if (this.rebalanceImpl != null) {
return this.rebalanceImpl.doRebalance(false);
}