This is an automated email from the ASF dual-hosted git repository.
jinrongtong pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git
The following commit(s) were added to refs/heads/develop by this push:
new 59e8f9b66e [ISSUE #7644] Optimize client rebalance
59e8f9b66e is described below
commit 59e8f9b66ede2f02ec40a0c58fd5e5c2bd6d59e5
Author: Zhouxiang Zhan <[email protected]>
AuthorDate: Wed Dec 27 10:42:50 2023 +0800
[ISSUE #7644] Optimize client rebalance
---
.../consumer/ConsumeMessageOrderlyService.java | 4 +-
.../impl/consumer/DefaultLitePullConsumerImpl.java | 8 +++
.../impl/consumer/DefaultMQPullConsumerImpl.java | 8 +++
.../impl/consumer/DefaultMQPushConsumerImpl.java | 9 +++
.../client/impl/consumer/MQConsumerInner.java | 2 +
.../client/impl/consumer/ProcessQueue.java | 8 +--
.../client/impl/consumer/RebalanceImpl.java | 10 ++-
.../client/impl/consumer/RebalancePushImpl.java | 72 +++++++++++-----------
.../client/impl/consumer/RebalanceService.java | 17 ++++-
.../client/impl/factory/MQClientInstance.java | 9 ++-
10 files changed, 97 insertions(+), 50 deletions(-)
diff --git
a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/ConsumeMessageOrderlyService.java
b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/ConsumeMessageOrderlyService.java
index 4246768d40..cab4fe5d69 100644
---
a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/ConsumeMessageOrderlyService.java
+++
b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/ConsumeMessageOrderlyService.java
@@ -488,7 +488,7 @@ public class ConsumeMessageOrderlyService implements
ConsumeMessageService {
ConsumeReturnType returnType =
ConsumeReturnType.SUCCESS;
boolean hasException = false;
try {
- this.processQueue.getConsumeLock().lock();
+
this.processQueue.getConsumeLock().readLock().lock();
if (this.processQueue.isDropped()) {
log.warn("consumeMessage, the message
queue not be able to consume, because it's dropped. {}",
this.messageQueue);
@@ -504,7 +504,7 @@ public class ConsumeMessageOrderlyService implements
ConsumeMessageService {
messageQueue), e);
hasException = true;
} finally {
- this.processQueue.getConsumeLock().unlock();
+
this.processQueue.getConsumeLock().readLock().unlock();
}
if (null == status
diff --git
a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultLitePullConsumerImpl.java
b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultLitePullConsumerImpl.java
index 20ca477008..9350970a07 100644
---
a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultLitePullConsumerImpl.java
+++
b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultLitePullConsumerImpl.java
@@ -1121,6 +1121,14 @@ public class DefaultLitePullConsumerImpl implements
MQConsumerInner {
}
}
+ @Override
+ public boolean tryRebalance() {
+ if (this.rebalanceImpl != null) {
+ return this.rebalanceImpl.doRebalance(false);
+ }
+ return false;
+ }
+
@Override
public void persistConsumerOffset() {
try {
diff --git
a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPullConsumerImpl.java
b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPullConsumerImpl.java
index e6d148c7f6..f5d326071d 100644
---
a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPullConsumerImpl.java
+++
b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPullConsumerImpl.java
@@ -386,6 +386,14 @@ public class DefaultMQPullConsumerImpl implements
MQConsumerInner {
}
}
+ @Override
+ public boolean tryRebalance() {
+ if (this.rebalanceImpl != null) {
+ return this.rebalanceImpl.doRebalance(false);
+ }
+ return false;
+ }
+
@Override
public void persistConsumerOffset() {
try {
diff --git
a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPushConsumerImpl.java
b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPushConsumerImpl.java
index 15563a4f0e..d2faed3783 100644
---
a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPushConsumerImpl.java
+++
b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPushConsumerImpl.java
@@ -417,6 +417,7 @@ public class DefaultMQPushConsumerImpl implements
MQConsumerInner {
// removeProcessQueue will also remove
offset to cancel the frozen status.
DefaultMQPushConsumerImpl.this.rebalanceImpl.removeProcessQueue(pullRequest.getMessageQueue());
+
DefaultMQPushConsumerImpl.this.rebalanceImpl.getmQClientFactory().rebalanceImmediately();
log.warn("fix the pull request offset,
{}", pullRequest);
} catch (Throwable e) {
@@ -1375,6 +1376,14 @@ public class DefaultMQPushConsumerImpl implements
MQConsumerInner {
}
}
+ @Override
+ public boolean tryRebalance() {
+ if (!this.pause) {
+ return this.rebalanceImpl.doRebalance(this.isConsumeOrderly());
+ }
+ return false;
+ }
+
@Override
public void persistConsumerOffset() {
try {
diff --git
a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/MQConsumerInner.java
b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/MQConsumerInner.java
index 7e84b508b1..8fc1cc9059 100644
---
a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/MQConsumerInner.java
+++
b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/MQConsumerInner.java
@@ -40,6 +40,8 @@ public interface MQConsumerInner {
void doRebalance();
+ boolean tryRebalance();
+
void persistConsumerOffset();
void updateTopicSubscribeInfo(final String topic, final Set<MessageQueue>
info);
diff --git
a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/ProcessQueue.java
b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/ProcessQueue.java
index ab94a98467..ebc208a8d8 100644
---
a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/ProcessQueue.java
+++
b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/ProcessQueue.java
@@ -22,18 +22,16 @@ import java.util.List;
import java.util.Map;
import java.util.TreeMap;
import java.util.concurrent.atomic.AtomicLong;
-import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReadWriteLock;
-import java.util.concurrent.locks.ReentrantLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import org.apache.commons.lang3.StringUtils;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.common.message.MessageAccessor;
import org.apache.rocketmq.common.message.MessageConst;
import org.apache.rocketmq.common.message.MessageExt;
-import org.apache.rocketmq.remoting.protocol.body.ProcessQueueInfo;
import org.apache.rocketmq.logging.org.slf4j.Logger;
import org.apache.rocketmq.logging.org.slf4j.LoggerFactory;
+import org.apache.rocketmq.remoting.protocol.body.ProcessQueueInfo;
/**
* Queue consumption snapshot
@@ -48,7 +46,7 @@ public class ProcessQueue {
private final TreeMap<Long, MessageExt> msgTreeMap = new TreeMap<>();
private final AtomicLong msgCount = new AtomicLong();
private final AtomicLong msgSize = new AtomicLong();
- private final Lock consumeLock = new ReentrantLock();
+ private final ReadWriteLock consumeLock = new ReentrantReadWriteLock();
/**
* A subset of msgTreeMap, will only be used when orderly consume
*/
@@ -392,7 +390,7 @@ public class ProcessQueue {
this.lastLockTimestamp = lastLockTimestamp;
}
- public Lock getConsumeLock() {
+ public ReadWriteLock getConsumeLock() {
return consumeLock;
}
diff --git
a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/RebalanceImpl.java
b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/RebalanceImpl.java
index 97d9460f82..53addc5f50 100644
---
a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/RebalanceImpl.java
+++
b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/RebalanceImpl.java
@@ -242,9 +242,15 @@ public abstract class RebalanceImpl {
final String topic = entry.getKey();
try {
if (!clientRebalance(topic) && tryQueryAssignment(topic)) {
- balanced = this.getRebalanceResultFromBroker(topic,
isOrder);
+ boolean result =
this.getRebalanceResultFromBroker(topic, isOrder);
+ if (!result) {
+ balanced = false;
+ }
} else {
- balanced = this.rebalanceByTopic(topic, isOrder);
+ boolean result = this.rebalanceByTopic(topic, isOrder);
+ if (!result) {
+ balanced = false;
+ }
}
} catch (Throwable e) {
if (!topic.startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {
diff --git
a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/RebalancePushImpl.java
b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/RebalancePushImpl.java
index f9cf429c69..f28890d306 100644
---
a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/RebalancePushImpl.java
+++
b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/RebalancePushImpl.java
@@ -91,32 +91,47 @@ public class RebalancePushImpl extends RebalanceImpl {
}
@Override
- public boolean removeUnnecessaryMessageQueue(MessageQueue mq, ProcessQueue
pq) {
- this.defaultMQPushConsumerImpl.getOffsetStore().persist(mq);
- this.defaultMQPushConsumerImpl.getOffsetStore().removeOffset(mq);
+ public boolean removeUnnecessaryMessageQueue(final MessageQueue mq, final
ProcessQueue pq) {
if (this.defaultMQPushConsumerImpl.isConsumeOrderly()
&&
MessageModel.CLUSTERING.equals(this.defaultMQPushConsumerImpl.messageModel())) {
- try {
- if (pq.getConsumeLock().tryLock(1000, TimeUnit.MILLISECONDS)) {
- try {
- return this.unlockDelay(mq, pq);
- } finally {
- pq.getConsumeLock().unlock();
- }
- } else {
- log.warn("[WRONG]mq is consuming, so can not unlock it,
{}. maybe hanged for a while, {}",
- mq,
- pq.getTryUnlockTimes());
- pq.incTryUnlockTimes();
+ // commit offset immediately
+ this.defaultMQPushConsumerImpl.getOffsetStore().persist(mq);
+
+ // remove order message queue: unlock & remove
+ return tryRemoveOrderMessageQueue(mq, pq);
+ } else {
+ this.defaultMQPushConsumerImpl.getOffsetStore().persist(mq);
+ this.defaultMQPushConsumerImpl.getOffsetStore().removeOffset(mq);
+ return true;
+ }
+ }
+
+ private boolean tryRemoveOrderMessageQueue(final MessageQueue mq, final
ProcessQueue pq) {
+ try {
+ // unlock & remove when no message is consuming or
UNLOCK_DELAY_TIME_MILLS timeout (Backwards compatibility)
+ boolean forceUnlock = pq.isDropped() && System.currentTimeMillis()
> pq.getLastLockTimestamp() + UNLOCK_DELAY_TIME_MILLS;
+ if (forceUnlock || pq.getConsumeLock().writeLock().tryLock(500,
TimeUnit.MILLISECONDS)) {
+ try {
+
RebalancePushImpl.this.defaultMQPushConsumerImpl.getOffsetStore().persist(mq);
+
RebalancePushImpl.this.defaultMQPushConsumerImpl.getOffsetStore().removeOffset(mq);
+
+ pq.setLocked(false);
+ RebalancePushImpl.this.unlock(mq, true);
+ return true;
+ } finally {
+ if (!forceUnlock) {
+ pq.getConsumeLock().writeLock().unlock();
+ }
}
- } catch (Exception e) {
- log.error("removeUnnecessaryMessageQueue Exception", e);
+ } else {
+ pq.incTryUnlockTimes();
}
-
- return false;
+ } catch (Exception e) {
+ pq.incTryUnlockTimes();
}
- return true;
+
+ return false;
}
@Override
@@ -129,23 +144,6 @@ public class RebalancePushImpl extends RebalanceImpl {
return true;
}
- private boolean unlockDelay(final MessageQueue mq, final ProcessQueue pq) {
-
- if (pq.hasTempMessage()) {
- log.info("[{}]unlockDelay, begin {} ", mq.hashCode(), mq);
-
this.defaultMQPushConsumerImpl.getmQClientFactory().getScheduledExecutorService().schedule(new
Runnable() {
- @Override
- public void run() {
- log.info("[{}]unlockDelay, execute at once {}",
mq.hashCode(), mq);
- RebalancePushImpl.this.unlock(mq, true);
- }
- }, UNLOCK_DELAY_TIME_MILLS, TimeUnit.MILLISECONDS);
- } else {
- this.unlock(mq, true);
- }
- return true;
- }
-
@Override
public ConsumeType consumeType() {
return ConsumeType.CONSUME_PASSIVELY;
diff --git
a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/RebalanceService.java
b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/RebalanceService.java
index 56f589d519..8e586c85fe 100644
---
a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/RebalanceService.java
+++
b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/RebalanceService.java
@@ -25,8 +25,12 @@ public class RebalanceService extends ServiceThread {
private static long waitInterval =
Long.parseLong(System.getProperty(
"rocketmq.client.rebalance.waitInterval", "20000"));
+ private static long minInterval =
+ Long.parseLong(System.getProperty(
+ "rocketmq.client.rebalance.minInterval", "1000"));
private final Logger log = LoggerFactory.getLogger(RebalanceService.class);
private final MQClientInstance mqClientFactory;
+ private long lastRebalanceTimestamp = System.currentTimeMillis();
public RebalanceService(MQClientInstance mqClientFactory) {
this.mqClientFactory = mqClientFactory;
@@ -36,9 +40,18 @@ public class RebalanceService extends ServiceThread {
public void run() {
log.info(this.getServiceName() + " service started");
+ long realWaitInterval = waitInterval;
while (!this.isStopped()) {
- this.waitForRunning(waitInterval);
- this.mqClientFactory.doRebalance();
+ this.waitForRunning(realWaitInterval);
+
+ long interval = System.currentTimeMillis() -
lastRebalanceTimestamp;
+ if (interval < minInterval) {
+ realWaitInterval = minInterval - interval;
+ } else {
+ boolean balanced = this.mqClientFactory.doRebalance();
+ realWaitInterval = balanced ? waitInterval : minInterval;
+ lastRebalanceTimestamp = System.currentTimeMillis();
+ }
}
log.info(this.getServiceName() + " service end");
diff --git
a/client/src/main/java/org/apache/rocketmq/client/impl/factory/MQClientInstance.java
b/client/src/main/java/org/apache/rocketmq/client/impl/factory/MQClientInstance.java
index ad39372d35..436782efd3 100644
---
a/client/src/main/java/org/apache/rocketmq/client/impl/factory/MQClientInstance.java
+++
b/client/src/main/java/org/apache/rocketmq/client/impl/factory/MQClientInstance.java
@@ -1060,17 +1060,22 @@ public class MQClientInstance {
this.rebalanceService.wakeup();
}
- public void doRebalance() {
+ public boolean doRebalance() {
+ boolean balanced = true;
for (Map.Entry<String, MQConsumerInner> entry :
this.consumerTable.entrySet()) {
MQConsumerInner impl = entry.getValue();
if (impl != null) {
try {
- impl.doRebalance();
+ if (!impl.tryRebalance()) {
+ balanced = false;
+ }
} catch (Throwable e) {
log.error("doRebalance exception", e);
}
}
}
+
+ return balanced;
}
public MQProducerInner selectProducer(final String group) {