This is an automated email from the ASF dual-hosted git repository.
dongeforever pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git
The following commit(s) were added to refs/heads/develop by this push:
new 0b80f80 [ROCKETMQ-294] Do flow control on the number and size
dimensions when pull message
0b80f80 is described below
commit 0b80f8092ed0f3fdae7ae8c3212a4286b7f24cfc
Author: yukon <[email protected]>
AuthorDate: Wed Oct 18 10:42:04 2017 +0800
[ROCKETMQ-294] Do flow control on the number and size dimensions when pull
message
Author: yukon <[email protected]>
Closes #171 from zhouxinyu/ROCKETMQ-294.
---
.../client/consumer/DefaultMQPushConsumer.java | 58 +++++++-
.../impl/consumer/DefaultMQPushConsumerImpl.java | 58 ++++++--
.../client/impl/consumer/ProcessQueue.java | 14 +-
.../client/impl/consumer/RebalancePushImpl.java | 20 +++
.../client/consumer/DefaultMQPushConsumerTest.java | 55 +++++++
.../client/impl/consumer/ProcessQueueTest.java | 107 ++++++++++++++
.../impl/consumer/RebalancePushImplTest.java | 163 +++++++++++++++++++++
.../common/protocol/body/ProcessQueueInfo.java | 15 +-
8 files changed, 476 insertions(+), 14 deletions(-)
diff --git
a/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultMQPushConsumer.java
b/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultMQPushConsumer.java
index e32edc9..d51030a 100644
---
a/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultMQPushConsumer.java
+++
b/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultMQPushConsumer.java
@@ -166,11 +166,43 @@ public class DefaultMQPushConsumer extends ClientConfig
implements MQPushConsume
private int consumeConcurrentlyMaxSpan = 2000;
/**
- * Flow control threshold
+ * Flow control threshold on queue level, each message queue will cache at
most 1000 messages by default,
+ * Consider the {@code pullBatchSize}, the instantaneous value may exceed
the limit
*/
private int pullThresholdForQueue = 1000;
/**
+ * Limit the cached message size on queue level, each message queue will
cache at most 100 MiB messages by default,
+ * Consider the {@code pullBatchSize}, the instantaneous value may exceed
the limit
+ *
+ * <p>
+ * The size of a message only measured by message body, so it's not
accurate
+ */
+ private int pullThresholdSizeForQueue = 100;
+
+ /**
+ * Flow control threshold on topic level, default value is -1(Unlimited)
+ * <p>
+ * The value of {@code pullThresholdForQueue} will be overwrote and
calculated based on
+ * {@code pullThresholdForTopic} if it is't unlimited
+ * <p>
+ * For example, if the value of pullThresholdForTopic is 1000 and 10
message queues are assigned to this consumer,
+ * then pullThresholdForQueue will be set to 100
+ */
+ private int pullThresholdForTopic = -1;
+
+ /**
+ * Limit the cached message size on topic level, default value is -1
MiB(Unlimited)
+ * <p>
+ * The value of {@code pullThresholdSizeForQueue} will be overwrote and
calculated based on
+ * {@code pullThresholdSizeForTopic} if it is't unlimited
+ * <p>
+ * For example, if the value of pullThresholdSizeForTopic is 1000 MiB and
10 message queues are
+ * assigned to this consumer, then pullThresholdSizeForQueue will be set
to 100 MiB
+ */
+ private int pullThresholdSizeForTopic = -1;
+
+ /**
* Message pull Interval
*/
private long pullInterval = 0;
@@ -407,6 +439,30 @@ public class DefaultMQPushConsumer extends ClientConfig
implements MQPushConsume
this.pullThresholdForQueue = pullThresholdForQueue;
}
+ public int getPullThresholdForTopic() {
+ return pullThresholdForTopic;
+ }
+
+ public void setPullThresholdForTopic(final int pullThresholdForTopic) {
+ this.pullThresholdForTopic = pullThresholdForTopic;
+ }
+
+ public int getPullThresholdSizeForQueue() {
+ return pullThresholdSizeForQueue;
+ }
+
+ public void setPullThresholdSizeForQueue(final int
pullThresholdSizeForQueue) {
+ this.pullThresholdSizeForQueue = pullThresholdSizeForQueue;
+ }
+
+ public int getPullThresholdSizeForTopic() {
+ return pullThresholdSizeForTopic;
+ }
+
+ public void setPullThresholdSizeForTopic(final int
pullThresholdSizeForTopic) {
+ this.pullThresholdSizeForTopic = pullThresholdSizeForTopic;
+ }
+
public Map<String, String> getSubscription() {
return subscription;
}
diff --git
a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPushConsumerImpl.java
b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPushConsumerImpl.java
index 4ba6216..72bc953 100644
---
a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPushConsumerImpl.java
+++
b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPushConsumerImpl.java
@@ -106,8 +106,8 @@ public class DefaultMQPushConsumerImpl implements
MQConsumerInner {
private MessageListener messageListenerInner;
private OffsetStore offsetStore;
private ConsumeMessageService consumeMessageService;
- private long flowControlTimes1 = 0;
- private long flowControlTimes2 = 0;
+ private long queueFlowControlTimes = 0;
+ private long queueMaxSpanFlowControlTimes = 0;
public DefaultMQPushConsumerImpl(DefaultMQPushConsumer
defaultMQPushConsumer, RPCHook rpcHook) {
this.defaultMQPushConsumer = defaultMQPushConsumer;
@@ -219,13 +219,25 @@ public class DefaultMQPushConsumerImpl implements
MQConsumerInner {
return;
}
- long size = processQueue.getMsgCount().get();
- if (size > this.defaultMQPushConsumer.getPullThresholdForQueue()) {
+ long cachedMessageCount = processQueue.getMsgCount().get();
+ long cachedMessageSizeInMiB = processQueue.getMsgSize().get() / (1024
* 1024);
+
+ if (cachedMessageCount >
this.defaultMQPushConsumer.getPullThresholdForQueue()) {
+ this.executePullRequestLater(pullRequest,
PULL_TIME_DELAY_MILLS_WHEN_FLOW_CONTROL);
+ if ((queueFlowControlTimes++ % 1000) == 0) {
+ log.warn(
+ "the cached message count exceeds the threshold {}, so do
flow control, minOffset={}, maxOffset={}, count={}, size={} MiB,
pullRequest={}, flowControlTimes={}",
+ this.defaultMQPushConsumer.getPullThresholdForQueue(),
processQueue.getMsgTreeMap().firstKey(),
processQueue.getMsgTreeMap().lastKey(), cachedMessageCount,
cachedMessageSizeInMiB, pullRequest, queueFlowControlTimes);
+ }
+ return;
+ }
+
+ if (cachedMessageSizeInMiB >
this.defaultMQPushConsumer.getPullThresholdSizeForQueue()) {
this.executePullRequestLater(pullRequest,
PULL_TIME_DELAY_MILLS_WHEN_FLOW_CONTROL);
- if ((flowControlTimes1++ % 1000) == 0) {
+ if ((queueFlowControlTimes++ % 1000) == 0) {
log.warn(
- "the consumer message buffer is full, so do flow control,
minOffset={}, maxOffset={}, size={}, pullRequest={}, flowControlTimes={}",
- processQueue.getMsgTreeMap().firstKey(),
processQueue.getMsgTreeMap().lastKey(), size, pullRequest, flowControlTimes1);
+ "the cached message size exceeds the threshold {} MiB, so
do flow control, minOffset={}, maxOffset={}, count={}, size={} MiB,
pullRequest={}, flowControlTimes={}",
+ this.defaultMQPushConsumer.getPullThresholdSizeForQueue(),
processQueue.getMsgTreeMap().firstKey(),
processQueue.getMsgTreeMap().lastKey(), cachedMessageCount,
cachedMessageSizeInMiB, pullRequest, queueFlowControlTimes);
}
return;
}
@@ -233,11 +245,11 @@ public class DefaultMQPushConsumerImpl implements
MQConsumerInner {
if (!this.consumeOrderly) {
if (processQueue.getMaxSpan() >
this.defaultMQPushConsumer.getConsumeConcurrentlyMaxSpan()) {
this.executePullRequestLater(pullRequest,
PULL_TIME_DELAY_MILLS_WHEN_FLOW_CONTROL);
- if ((flowControlTimes2++ % 1000) == 0) {
+ if ((queueMaxSpanFlowControlTimes++ % 1000) == 0) {
log.warn(
"the queue's messages, span too long, so do flow
control, minOffset={}, maxOffset={}, maxSpan={}, pullRequest={},
flowControlTimes={}",
processQueue.getMsgTreeMap().firstKey(),
processQueue.getMsgTreeMap().lastKey(), processQueue.getMaxSpan(),
- pullRequest, flowControlTimes2);
+ pullRequest, queueMaxSpanFlowControlTimes);
}
return;
}
@@ -732,6 +744,34 @@ public class DefaultMQPushConsumerImpl implements
MQConsumerInner {
null);
}
+ // pullThresholdForTopic
+ if (this.defaultMQPushConsumer.getPullThresholdForTopic() != -1) {
+ if (this.defaultMQPushConsumer.getPullThresholdForTopic() < 1 ||
this.defaultMQPushConsumer.getPullThresholdForTopic() > 6553500) {
+ throw new MQClientException(
+ "pullThresholdForTopic Out of range [1, 6553500]"
+ +
FAQUrl.suggestTodo(FAQUrl.CLIENT_PARAMETER_CHECK_URL),
+ null);
+ }
+ }
+
+ // pullThresholdSizeForQueue
+ if (this.defaultMQPushConsumer.getPullThresholdSizeForQueue() < 1 ||
this.defaultMQPushConsumer.getPullThresholdSizeForQueue() > 1024) {
+ throw new MQClientException(
+ "pullThresholdSizeForQueue Out of range [1, 1024]"
+ + FAQUrl.suggestTodo(FAQUrl.CLIENT_PARAMETER_CHECK_URL),
+ null);
+ }
+
+ if (this.defaultMQPushConsumer.getPullThresholdSizeForTopic() != -1) {
+ // pullThresholdSizeForTopic
+ if (this.defaultMQPushConsumer.getPullThresholdSizeForTopic() < 1
|| this.defaultMQPushConsumer.getPullThresholdSizeForTopic() > 102400) {
+ throw new MQClientException(
+ "pullThresholdSizeForTopic Out of range [1, 102400]"
+ +
FAQUrl.suggestTodo(FAQUrl.CLIENT_PARAMETER_CHECK_URL),
+ null);
+ }
+ }
+
// pullInterval
if (this.defaultMQPushConsumer.getPullInterval() < 0 ||
this.defaultMQPushConsumer.getPullInterval() > 65535) {
throw new MQClientException(
diff --git
a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/ProcessQueue.java
b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/ProcessQueue.java
index 38b8073..e21dbc8 100644
---
a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/ProcessQueue.java
+++
b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/ProcessQueue.java
@@ -45,6 +45,7 @@ public class ProcessQueue {
private final ReadWriteLock lockTreeMap = new ReentrantReadWriteLock();
private final TreeMap<Long, MessageExt> msgTreeMap = new TreeMap<Long,
MessageExt>();
private final AtomicLong msgCount = new AtomicLong();
+ private final AtomicLong msgSize = new AtomicLong();
private final Lock lockConsume = new ReentrantLock();
private final TreeMap<Long, MessageExt> msgTreeMapTemp = new TreeMap<Long,
MessageExt>();
private final AtomicLong tryUnlockTimes = new AtomicLong(0);
@@ -129,6 +130,7 @@ public class ProcessQueue {
if (null == old) {
validMsgCnt++;
this.queueOffsetMax = msg.getQueueOffset();
+ msgSize.addAndGet(msg.getBody().length);
}
}
msgCount.addAndGet(validMsgCnt);
@@ -189,6 +191,7 @@ public class ProcessQueue {
MessageExt prev =
msgTreeMap.remove(msg.getQueueOffset());
if (prev != null) {
removedCnt--;
+ msgSize.addAndGet(0 - msg.getBody().length);
}
}
msgCount.addAndGet(removedCnt);
@@ -215,6 +218,10 @@ public class ProcessQueue {
return msgCount;
}
+ public AtomicLong getMsgSize() {
+ return msgSize;
+ }
+
public boolean isDropped() {
return dropped;
}
@@ -250,7 +257,10 @@ public class ProcessQueue {
this.lockTreeMap.writeLock().lockInterruptibly();
try {
Long offset = this.msgTreeMapTemp.lastKey();
- msgCount.addAndGet(this.msgTreeMapTemp.size() * (-1));
+ msgCount.addAndGet(0 - this.msgTreeMapTemp.size());
+ for (MessageExt msg : this.msgTreeMapTemp.values()) {
+ msgSize.addAndGet(0 - msg.getBody().length);
+ }
this.msgTreeMapTemp.clear();
if (offset != null) {
return offset + 1;
@@ -334,6 +344,7 @@ public class ProcessQueue {
this.msgTreeMap.clear();
this.msgTreeMapTemp.clear();
this.msgCount.set(0);
+ this.msgSize.set(0);
this.queueOffsetMax = 0L;
} finally {
this.lockTreeMap.writeLock().unlock();
@@ -387,6 +398,7 @@ public class ProcessQueue {
info.setCachedMsgMinOffset(this.msgTreeMap.firstKey());
info.setCachedMsgMaxOffset(this.msgTreeMap.lastKey());
info.setCachedMsgCount(this.msgTreeMap.size());
+ info.setCachedMsgSizeInMiB((int) (this.msgSize.get() / (1024 *
1024)));
}
if (!this.msgTreeMapTemp.isEmpty()) {
diff --git
a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/RebalancePushImpl.java
b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/RebalancePushImpl.java
index 39e0251..e5166f3 100644
---
a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/RebalancePushImpl.java
+++
b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/RebalancePushImpl.java
@@ -57,6 +57,26 @@ public class RebalancePushImpl extends RebalanceImpl {
long newVersion = System.currentTimeMillis();
log.info("{} Rebalance changed, also update version: {}, {}", topic,
subscriptionData.getSubVersion(), newVersion);
subscriptionData.setSubVersion(newVersion);
+
+ int currentQueueCount = this.processQueueTable.size();
+ if (currentQueueCount != 0) {
+ int pullThresholdForTopic =
this.defaultMQPushConsumerImpl.getDefaultMQPushConsumer().getPullThresholdForTopic();
+ if (pullThresholdForTopic != -1) {
+ int newVal = Math.max(1, pullThresholdForTopic /
currentQueueCount);
+ log.info("The pullThresholdForQueue is changed from {} to {}",
+
this.defaultMQPushConsumerImpl.getDefaultMQPushConsumer().getPullThresholdForQueue(),
newVal);
+
this.defaultMQPushConsumerImpl.getDefaultMQPushConsumer().setPullThresholdForQueue(newVal);
+ }
+
+ int pullThresholdSizeForTopic =
this.defaultMQPushConsumerImpl.getDefaultMQPushConsumer().getPullThresholdSizeForTopic();
+ if (pullThresholdSizeForTopic != -1) {
+ int newVal = Math.max(1, pullThresholdSizeForTopic /
currentQueueCount);
+ log.info("The pullThresholdSizeForQueue is changed from {} to
{}",
+
this.defaultMQPushConsumerImpl.getDefaultMQPushConsumer().getPullThresholdSizeForQueue(),
newVal);
+
this.defaultMQPushConsumerImpl.getDefaultMQPushConsumer().setPullThresholdSizeForQueue(newVal);
+ }
+ }
+
// notify broker
this.getmQClientFactory().sendHeartbeatToAllBrokerWithLock();
}
diff --git
a/client/src/test/java/org/apache/rocketmq/client/consumer/DefaultMQPushConsumerTest.java
b/client/src/test/java/org/apache/rocketmq/client/consumer/DefaultMQPushConsumerTest.java
index 94b3f0f..b21edc9 100644
---
a/client/src/test/java/org/apache/rocketmq/client/consumer/DefaultMQPushConsumerTest.java
+++
b/client/src/test/java/org/apache/rocketmq/client/consumer/DefaultMQPushConsumerTest.java
@@ -32,6 +32,7 @@ import
org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyStatus;
import
org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.client.consumer.listener.MessageListenerOrderly;
import org.apache.rocketmq.client.exception.MQBrokerException;
+import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.impl.CommunicationMode;
import org.apache.rocketmq.client.impl.FindBrokerResult;
import org.apache.rocketmq.client.impl.MQClientAPIImpl;
@@ -62,6 +63,7 @@ import org.mockito.junit.MockitoJUnitRunner;
import org.mockito.stubbing.Answer;
import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Fail.failBecauseExceptionWasNotThrown;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyBoolean;
import static org.mockito.ArgumentMatchers.anyLong;
@@ -207,6 +209,59 @@ public class DefaultMQPushConsumerTest {
assertThat(messageExts[0].getBody()).isEqualTo(new byte[] {'a'});
}
+ @Test
+ public void testCheckConfig() {
+ DefaultMQPushConsumer pushConsumer = createPushConsumer();
+
+ pushConsumer.setPullThresholdForQueue(65535 + 1);
+ try {
+ pushConsumer.start();
+ failBecauseExceptionWasNotThrown(MQClientException.class);
+ } catch (MQClientException e) {
+ assertThat(e).hasMessageContaining("pullThresholdForQueue Out of
range [1, 65535]");
+ }
+
+ pushConsumer = createPushConsumer();
+ pushConsumer.setPullThresholdForTopic(65535 * 100 + 1);
+
+ try {
+ pushConsumer.start();
+ failBecauseExceptionWasNotThrown(MQClientException.class);
+ } catch (MQClientException e) {
+ assertThat(e).hasMessageContaining("pullThresholdForTopic Out of
range [1, 6553500]");
+ }
+
+ pushConsumer = createPushConsumer();
+ pushConsumer.setPullThresholdSizeForQueue(1024 + 1);
+ try {
+ pushConsumer.start();
+ failBecauseExceptionWasNotThrown(MQClientException.class);
+ } catch (MQClientException e) {
+ assertThat(e).hasMessageContaining("pullThresholdSizeForQueue Out
of range [1, 1024]");
+ }
+
+ pushConsumer = createPushConsumer();
+ pushConsumer.setPullThresholdSizeForTopic(1024 * 100 + 1);
+ try {
+ pushConsumer.start();
+ failBecauseExceptionWasNotThrown(MQClientException.class);
+ } catch (MQClientException e) {
+ assertThat(e).hasMessageContaining("pullThresholdSizeForTopic Out
of range [1, 102400]");
+ }
+ }
+
+ private DefaultMQPushConsumer createPushConsumer() {
+ DefaultMQPushConsumer pushConsumer = new
DefaultMQPushConsumer(consumerGroup);
+ pushConsumer.registerMessageListener(new MessageListenerConcurrently()
{
+ @Override
+ public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt>
msgs,
+ ConsumeConcurrentlyContext context) {
+ return null;
+ }
+ });
+ return pushConsumer;
+ }
+
private PullRequest createPullRequest() {
PullRequest pullRequest = new PullRequest();
pullRequest.setConsumerGroup(consumerGroup);
diff --git
a/client/src/test/java/org/apache/rocketmq/client/impl/consumer/ProcessQueueTest.java
b/client/src/test/java/org/apache/rocketmq/client/impl/consumer/ProcessQueueTest.java
new file mode 100644
index 0000000..d6a6dcf
--- /dev/null
+++
b/client/src/test/java/org/apache/rocketmq/client/impl/consumer/ProcessQueueTest.java
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.client.impl.consumer;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import org.apache.rocketmq.common.message.MessageExt;
+import org.apache.rocketmq.common.protocol.body.ProcessQueueInfo;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.junit.MockitoJUnitRunner;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+@RunWith(MockitoJUnitRunner.class)
+public class ProcessQueueTest {
+
+ @Test
+ public void testCachedMessageCount() {
+ ProcessQueue pq = new ProcessQueue();
+
+ pq.putMessage(createMessageList());
+
+ assertThat(pq.getMsgCount().get()).isEqualTo(100);
+
+ pq.takeMessags(10);
+ pq.commit();
+
+ assertThat(pq.getMsgCount().get()).isEqualTo(90);
+
+
pq.removeMessage(Collections.singletonList(pq.getMsgTreeMap().lastEntry().getValue()));
+ assertThat(pq.getMsgCount().get()).isEqualTo(89);
+ }
+
+ @Test
+ public void testCachedMessageSize() {
+ ProcessQueue pq = new ProcessQueue();
+
+ pq.putMessage(createMessageList());
+
+ assertThat(pq.getMsgSize().get()).isEqualTo(100 * 123);
+
+ pq.takeMessags(10);
+ pq.commit();
+
+ assertThat(pq.getMsgSize().get()).isEqualTo(90 * 123);
+
+
pq.removeMessage(Collections.singletonList(pq.getMsgTreeMap().lastEntry().getValue()));
+ assertThat(pq.getMsgSize().get()).isEqualTo(89 * 123);
+ }
+
+ @Test
+ public void testFillProcessQueueInfo() {
+ ProcessQueue pq = new ProcessQueue();
+ pq.putMessage(createMessageList(102400));
+
+ ProcessQueueInfo processQueueInfo = new ProcessQueueInfo();
+ pq.fillProcessQueueInfo(processQueueInfo);
+
+ assertThat(processQueueInfo.getCachedMsgSizeInMiB()).isEqualTo(12);
+
+ pq.takeMessags(10000);
+ pq.commit();
+ pq.fillProcessQueueInfo(processQueueInfo);
+ assertThat(processQueueInfo.getCachedMsgSizeInMiB()).isEqualTo(10);
+
+ pq.takeMessags(10000);
+ pq.commit();
+ pq.fillProcessQueueInfo(processQueueInfo);
+ assertThat(processQueueInfo.getCachedMsgSizeInMiB()).isEqualTo(9);
+
+ pq.takeMessags(80000);
+ pq.commit();
+ pq.fillProcessQueueInfo(processQueueInfo);
+ assertThat(processQueueInfo.getCachedMsgSizeInMiB()).isEqualTo(0);
+ }
+
+ private List<MessageExt> createMessageList() {
+ return createMessageList(100);
+ }
+
+ private List<MessageExt> createMessageList(int count) {
+ List<MessageExt> messageExtList = new ArrayList<MessageExt>();
+ for (int i = 0; i < count; i++) {
+ MessageExt messageExt = new MessageExt();
+ messageExt.setQueueOffset(i);
+ messageExt.setBody(new byte[123]);
+ messageExtList.add(messageExt);
+ }
+ return messageExtList;
+ }
+}
\ No newline at end of file
diff --git
a/client/src/test/java/org/apache/rocketmq/client/impl/consumer/RebalancePushImplTest.java
b/client/src/test/java/org/apache/rocketmq/client/impl/consumer/RebalancePushImplTest.java
new file mode 100644
index 0000000..796a394
--- /dev/null
+++
b/client/src/test/java/org/apache/rocketmq/client/impl/consumer/RebalancePushImplTest.java
@@ -0,0 +1,163 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.client.impl.consumer;
+
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Set;
+import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
+import
org.apache.rocketmq.client.consumer.rebalance.AllocateMessageQueueAveragely;
+import org.apache.rocketmq.client.consumer.store.OffsetStore;
+import org.apache.rocketmq.client.exception.MQClientException;
+import org.apache.rocketmq.client.impl.factory.MQClientInstance;
+import org.apache.rocketmq.common.message.MessageQueue;
+import org.apache.rocketmq.common.protocol.heartbeat.MessageModel;
+import org.apache.rocketmq.common.protocol.heartbeat.SubscriptionData;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.Mock;
+import org.mockito.Spy;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.junit.MockitoJUnitRunner;
+import org.mockito.stubbing.Answer;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyString;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.when;
+
+@RunWith(MockitoJUnitRunner.class)
+public class RebalancePushImplTest {
+ @Spy
+ private DefaultMQPushConsumerImpl defaultMQPushConsumer = new
DefaultMQPushConsumerImpl(new DefaultMQPushConsumer("RebalancePushImplTest"),
null);
+ @Mock
+ private MQClientInstance mqClientInstance;
+ @Mock
+ private OffsetStore offsetStore;
+ private String consumerGroup = "CID_RebalancePushImplTest";
+ private String topic = "TopicA";
+
+ @Test
+ public void testMessageQueueChanged_CountThreshold() {
+ RebalancePushImpl rebalancePush = new RebalancePushImpl(consumerGroup,
MessageModel.CLUSTERING,
+ new AllocateMessageQueueAveragely(), mqClientInstance,
defaultMQPushConsumer);
+ init(rebalancePush);
+
+ // Just set pullThresholdForQueue
+
defaultMQPushConsumer.getDefaultMQPushConsumer().setPullThresholdForQueue(1024);
+ Set<MessageQueue> allocateResultSet = new HashSet<MessageQueue>();
+ allocateResultSet.add(new MessageQueue(topic, "BrokerA", 0));
+ allocateResultSet.add(new MessageQueue(topic, "BrokerA", 1));
+ doRebalanceForcibly(rebalancePush, allocateResultSet);
+
assertThat(defaultMQPushConsumer.getDefaultMQPushConsumer().getPullThresholdForQueue()).isEqualTo(1024);
+
+ // Set pullThresholdForTopic
+
defaultMQPushConsumer.getDefaultMQPushConsumer().setPullThresholdForTopic(1024);
+ doRebalanceForcibly(rebalancePush, allocateResultSet);
+
assertThat(defaultMQPushConsumer.getDefaultMQPushConsumer().getPullThresholdForQueue()).isEqualTo(512);
+
+ // Change message queue allocate result
+ allocateResultSet.add(new MessageQueue(topic, "BrokerA", 2));
+ doRebalanceForcibly(rebalancePush, allocateResultSet);
+
assertThat(defaultMQPushConsumer.getDefaultMQPushConsumer().getPullThresholdForQueue()).isEqualTo(341);
+ }
+
+ private void doRebalanceForcibly(RebalancePushImpl rebalancePush,
Set<MessageQueue> allocateResultSet) {
+ rebalancePush.topicSubscribeInfoTable.put(topic, allocateResultSet);
+ rebalancePush.doRebalance(false);
+ rebalancePush.messageQueueChanged(topic, allocateResultSet,
allocateResultSet);
+ }
+
+ private void init(final RebalancePushImpl rebalancePush) {
+ rebalancePush.getSubscriptionInner().putIfAbsent(topic, new
SubscriptionData());
+
+ rebalancePush.subscriptionInner.putIfAbsent(topic, new
SubscriptionData());
+
+ when(mqClientInstance.findConsumerIdList(anyString(),
anyString())).thenReturn(Collections.singletonList(consumerGroup));
+ when(mqClientInstance.getClientId()).thenReturn(consumerGroup);
+ when(defaultMQPushConsumer.getOffsetStore()).thenReturn(offsetStore);
+
+ doAnswer(new Answer() {
+ @Override
+ public Object answer(final InvocationOnMock invocation) throws
Throwable {
+ return null;
+ }
+
}).when(defaultMQPushConsumer).executePullRequestImmediately(any(PullRequest.class));
+ }
+
+ @Test
+ public void testMessageQueueChanged_SizeThreshold() {
+ RebalancePushImpl rebalancePush = new RebalancePushImpl(consumerGroup,
MessageModel.CLUSTERING,
+ new AllocateMessageQueueAveragely(), mqClientInstance,
defaultMQPushConsumer);
+ init(rebalancePush);
+
+ // Just set pullThresholdSizeForQueue
+
defaultMQPushConsumer.getDefaultMQPushConsumer().setPullThresholdSizeForQueue(1024);
+ Set<MessageQueue> allocateResultSet = new HashSet<MessageQueue>();
+ allocateResultSet.add(new MessageQueue(topic, "BrokerA", 0));
+ allocateResultSet.add(new MessageQueue(topic, "BrokerA", 1));
+ doRebalanceForcibly(rebalancePush, allocateResultSet);
+
assertThat(defaultMQPushConsumer.getDefaultMQPushConsumer().getPullThresholdSizeForQueue()).isEqualTo(1024);
+
+ // Set pullThresholdSizeForTopic
+
defaultMQPushConsumer.getDefaultMQPushConsumer().setPullThresholdSizeForTopic(1024);
+ doRebalanceForcibly(rebalancePush, allocateResultSet);
+
assertThat(defaultMQPushConsumer.getDefaultMQPushConsumer().getPullThresholdSizeForQueue()).isEqualTo(512);
+
+ // Change message queue allocate result
+ allocateResultSet.add(new MessageQueue(topic, "BrokerA", 2));
+ doRebalanceForcibly(rebalancePush, allocateResultSet);
+
assertThat(defaultMQPushConsumer.getDefaultMQPushConsumer().getPullThresholdSizeForQueue()).isEqualTo(341);
+ }
+
+ @Test
+ public void testMessageQueueChanged_ConsumerRuntimeInfo() throws
MQClientException {
+ RebalancePushImpl rebalancePush = new RebalancePushImpl(consumerGroup,
MessageModel.CLUSTERING,
+ new AllocateMessageQueueAveragely(), mqClientInstance,
defaultMQPushConsumer);
+ init(rebalancePush);
+
+
defaultMQPushConsumer.getDefaultMQPushConsumer().setPullThresholdSizeForQueue(1024);
+
defaultMQPushConsumer.getDefaultMQPushConsumer().setPullThresholdForQueue(1024);
+ Set<MessageQueue> allocateResultSet = new HashSet<MessageQueue>();
+ allocateResultSet.add(new MessageQueue(topic, "BrokerA", 0));
+ allocateResultSet.add(new MessageQueue(topic, "BrokerA", 1));
+ doRebalanceForcibly(rebalancePush, allocateResultSet);
+
+ defaultMQPushConsumer.setConsumeMessageService(new
ConsumeMessageConcurrentlyService(defaultMQPushConsumer, null));
+
assertThat(defaultMQPushConsumer.consumerRunningInfo().getProperties().get("pullThresholdSizeForQueue")).isEqualTo("1024");
+
assertThat(defaultMQPushConsumer.consumerRunningInfo().getProperties().get("pullThresholdForQueue")).isEqualTo("1024");
+
assertThat(defaultMQPushConsumer.consumerRunningInfo().getProperties().get("pullThresholdSizeForTopic")).isEqualTo("-1");
+
assertThat(defaultMQPushConsumer.consumerRunningInfo().getProperties().get("pullThresholdForTopic")).isEqualTo("-1");
+
+
defaultMQPushConsumer.getDefaultMQPushConsumer().setPullThresholdSizeForTopic(1024);
+
defaultMQPushConsumer.getDefaultMQPushConsumer().setPullThresholdForTopic(1024);
+ doRebalanceForcibly(rebalancePush, allocateResultSet);
+
assertThat(defaultMQPushConsumer.consumerRunningInfo().getProperties().get("pullThresholdSizeForQueue")).isEqualTo("512");
+
assertThat(defaultMQPushConsumer.consumerRunningInfo().getProperties().get("pullThresholdForQueue")).isEqualTo("512");
+
assertThat(defaultMQPushConsumer.consumerRunningInfo().getProperties().get("pullThresholdSizeForTopic")).isEqualTo("1024");
+
assertThat(defaultMQPushConsumer.consumerRunningInfo().getProperties().get("pullThresholdForTopic")).isEqualTo("1024");
+
+ // Change message queue allocate result
+ allocateResultSet.add(new MessageQueue(topic, "BrokerA", 2));
+ doRebalanceForcibly(rebalancePush, allocateResultSet);
+
assertThat(defaultMQPushConsumer.consumerRunningInfo().getProperties().get("pullThresholdSizeForQueue")).isEqualTo("341");
+
assertThat(defaultMQPushConsumer.consumerRunningInfo().getProperties().get("pullThresholdForQueue")).isEqualTo("341");
+
assertThat(defaultMQPushConsumer.consumerRunningInfo().getProperties().get("pullThresholdSizeForTopic")).isEqualTo("1024");
+
assertThat(defaultMQPushConsumer.consumerRunningInfo().getProperties().get("pullThresholdForTopic")).isEqualTo("1024");
+ }
+}
\ No newline at end of file
diff --git
a/common/src/main/java/org/apache/rocketmq/common/protocol/body/ProcessQueueInfo.java
b/common/src/main/java/org/apache/rocketmq/common/protocol/body/ProcessQueueInfo.java
index e2e9943..6b220b8 100644
---
a/common/src/main/java/org/apache/rocketmq/common/protocol/body/ProcessQueueInfo.java
+++
b/common/src/main/java/org/apache/rocketmq/common/protocol/body/ProcessQueueInfo.java
@@ -25,6 +25,7 @@ public class ProcessQueueInfo {
private long cachedMsgMinOffset;
private long cachedMsgMaxOffset;
private int cachedMsgCount;
+ private int cachedMsgSizeInMiB;
private long transactionMsgMinOffset;
private long transactionMsgMaxOffset;
@@ -142,16 +143,24 @@ public class ProcessQueueInfo {
this.lastConsumeTimestamp = lastConsumeTimestamp;
}
+ public int getCachedMsgSizeInMiB() {
+ return cachedMsgSizeInMiB;
+ }
+
+ public void setCachedMsgSizeInMiB(final int cachedMsgSizeInMiB) {
+ this.cachedMsgSizeInMiB = cachedMsgSizeInMiB;
+ }
+
@Override
public String toString() {
return "ProcessQueueInfo [commitOffset=" + commitOffset + ",
cachedMsgMinOffset="
- + cachedMsgMinOffset + ", cachedMsgMaxOffset=" +
cachedMsgMaxOffset + ", cachedMsgCount="
- + cachedMsgCount + ", transactionMsgMinOffset=" +
transactionMsgMinOffset
+ + cachedMsgMinOffset + ", cachedMsgMaxOffset=" + cachedMsgMaxOffset
+ + ", cachedMsgCount=" + cachedMsgCount + ", cachedMsgSizeInMiB=" +
cachedMsgSizeInMiB
+ + ", transactionMsgMinOffset=" + transactionMsgMinOffset
+ ", transactionMsgMaxOffset=" + transactionMsgMaxOffset + ",
transactionMsgCount="
+ transactionMsgCount + ", locked=" + locked + ", tryUnlockTimes="
+ tryUnlockTimes
+ ", lastLockTimestamp=" +
UtilAll.timeMillisToHumanString(lastLockTimestamp) + ", droped="
+ droped + ", lastPullTimestamp=" +
UtilAll.timeMillisToHumanString(lastPullTimestamp)
+ ", lastConsumeTimestamp=" +
UtilAll.timeMillisToHumanString(lastConsumeTimestamp) + "]";
-
}
}
--
To stop receiving notification emails like this one, please contact
['"[email protected]" <[email protected]>'].