This is an automated email from the ASF dual-hosted git repository.
lizhanhui pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git
The following commit(s) were added to refs/heads/develop by this push:
new 2113c16eb [RIP-48] Support reset offset in server-side for pop message
(#5457)
2113c16eb is described below
commit 2113c16ebc4fa22cb47d812cda844b84882aa55f
Author: lizhimins <[email protected]>
AuthorDate: Mon Nov 7 13:55:03 2022 +0800
[RIP-48] Support reset offset in server-side for pop message (#5457)
* [RIP-48] Support reset offset in server-side for pop message
* [RIP-48] Support reset offset in server-side for pop message
* no need remove error log
Co-authored-by: 斜阳 <[email protected]>
---
.../broker/offset/ConsumerOrderInfoManager.java | 7 +
.../broker/processor/PopMessageProcessor.java | 45 ++-
.../test/client/rmq/RMQNormalConsumer.java | 17 +-
.../rocketmq/test/client/rmq/RMQPopClient.java | 13 +-
.../rocketmq/test/client/rmq/RMQPopConsumer.java | 77 ++++-
.../rocketmq/test/factory/ConsumerFactory.java | 8 +-
.../rocketmq/test/util/MQAdminTestUtils.java | 2 +-
.../rocketmq/test/offset/OffsetResetForPopIT.java | 355 +++++++++++++++++++++
.../apache/rocketmq/tools/command/CommandUtil.java | 13 +
9 files changed, 505 insertions(+), 32 deletions(-)
diff --git
a/broker/src/main/java/org/apache/rocketmq/broker/offset/ConsumerOrderInfoManager.java
b/broker/src/main/java/org/apache/rocketmq/broker/offset/ConsumerOrderInfoManager.java
index 5b0bb9760..6f480f49d 100644
---
a/broker/src/main/java/org/apache/rocketmq/broker/offset/ConsumerOrderInfoManager.java
+++
b/broker/src/main/java/org/apache/rocketmq/broker/offset/ConsumerOrderInfoManager.java
@@ -159,6 +159,13 @@ public class ConsumerOrderInfoManager extends
ConfigManager {
return orderInfo.needBlock(invisibleTime);
}
+ public void clearBlock(String topic, String group, int queueId) {
+ table.computeIfPresent(buildKey(topic, group), (key, val) -> {
+ val.remove(queueId);
+ return val;
+ });
+ }
+
/**
* mark message is consumed finished. return the consumer offset
*
diff --git
a/broker/src/main/java/org/apache/rocketmq/broker/processor/PopMessageProcessor.java
b/broker/src/main/java/org/apache/rocketmq/broker/processor/PopMessageProcessor.java
index 0b8801ebf..ac1dd4615 100644
---
a/broker/src/main/java/org/apache/rocketmq/broker/processor/PopMessageProcessor.java
+++
b/broker/src/main/java/org/apache/rocketmq/broker/processor/PopMessageProcessor.java
@@ -498,12 +498,15 @@ public class PopMessageProcessor implements
NettyRequestProcessor {
String lockKey =
topic + PopAckConstants.SPLIT + requestHeader.getConsumerGroup() +
PopAckConstants.SPLIT + queueId;
boolean isOrder = requestHeader.isOrder();
- long offset = getPopOffset(topic, requestHeader, queueId, false,
lockKey);
+ long offset = getPopOffset(topic, requestHeader.getConsumerGroup(),
queueId, requestHeader.getInitMode(),
+ false, lockKey, false);
if (!queueLockManager.tryLock(lockKey)) {
restNum =
this.brokerController.getMessageStore().getMaxOffsetInQueue(topic, queueId) -
offset + restNum;
return restNum;
}
- offset = getPopOffset(topic, requestHeader, queueId, true, lockKey);
+ offset = getPopOffset(topic, requestHeader.getConsumerGroup(),
queueId, requestHeader.getInitMode(),
+ true, lockKey, true);
+
GetMessageResult getMessageTmpResult = null;
try {
if (isOrder &&
brokerController.getConsumerOrderInfoManager().checkBlock(topic,
@@ -594,12 +597,12 @@ public class PopMessageProcessor implements
NettyRequestProcessor {
return restNum;
}
- private long getPopOffset(String topic, PopMessageRequestHeader
requestHeader, int queueId, boolean init,
- String lockKey) {
- long offset =
this.brokerController.getConsumerOffsetManager().queryOffset(requestHeader.getConsumerGroup(),
- topic, queueId);
+ private long getPopOffset(String topic, String group, int queueId, int
initMode, boolean init, String lockKey,
+ boolean checkResetOffset) {
+
+ long offset =
this.brokerController.getConsumerOffsetManager().queryOffset(group, topic,
queueId);
if (offset < 0) {
- if (ConsumeInitMode.MIN == requestHeader.getInitMode()) {
+ if (ConsumeInitMode.MIN == initMode) {
offset =
this.brokerController.getMessageStore().getMinOffsetInQueue(topic, queueId);
} else {
// pop last one,then commit offset.
@@ -609,17 +612,24 @@ public class PopMessageProcessor implements
NettyRequestProcessor {
offset = 0;
}
if (init) {
-
this.brokerController.getConsumerOffsetManager().commitOffset("getPopOffset",
- requestHeader.getConsumerGroup(), topic,
- queueId, offset);
+
this.brokerController.getConsumerOffsetManager().commitOffset(
+ "getPopOffset", group, topic, queueId, offset);
}
}
}
+
+ if (checkResetOffset) {
+ Long resetOffset = resetPopOffset(topic, group, queueId);
+ if (resetOffset != null) {
+ return resetOffset;
+ }
+ }
+
long bufferOffset =
this.popBufferMergeService.getLatestOffset(lockKey);
if (bufferOffset < 0) {
return offset;
} else {
- return bufferOffset > offset ? bufferOffset : offset;
+ return Math.max(bufferOffset, offset);
}
}
@@ -734,6 +744,19 @@ public class PopMessageProcessor implements
NettyRequestProcessor {
);
}
+ private Long resetPopOffset(String topic, String group, int queueId) {
+ String lockKey = topic + PopAckConstants.SPLIT + group +
PopAckConstants.SPLIT + queueId;
+ Long resetOffset =
+
this.brokerController.getConsumerOffsetManager().queryThenEraseResetOffset(topic,
group, queueId);
+ if (resetOffset != null) {
+
this.brokerController.getConsumerOrderInfoManager().clearBlock(topic, group,
queueId);
+ this.getPopBufferMergeService().clearOffsetQueue(lockKey);
+ this.brokerController.getConsumerOffsetManager()
+ .commitOffset("ResetPopOffset", group, topic, queueId,
resetOffset);
+ }
+ return resetOffset;
+ }
+
private byte[] readGetMessageResult(final GetMessageResult
getMessageResult, final String group, final String topic,
final int queueId) {
final ByteBuffer byteBuffer =
ByteBuffer.allocate(getMessageResult.getBufferTotalSize());
diff --git
a/test/src/main/java/org/apache/rocketmq/test/client/rmq/RMQNormalConsumer.java
b/test/src/main/java/org/apache/rocketmq/test/client/rmq/RMQNormalConsumer.java
index 71f908887..7cbeaa810 100644
---
a/test/src/main/java/org/apache/rocketmq/test/client/rmq/RMQNormalConsumer.java
+++
b/test/src/main/java/org/apache/rocketmq/test/client/rmq/RMQNormalConsumer.java
@@ -25,7 +25,8 @@ import org.apache.rocketmq.test.listener.AbstractListener;
import org.apache.rocketmq.test.util.RandomUtil;
public class RMQNormalConsumer extends AbstractMQConsumer {
- private static Logger logger = Logger.getLogger(RMQNormalConsumer.class);
+
+ private static final Logger LOGGER =
Logger.getLogger(RMQNormalConsumer.class);
protected DefaultMQPushConsumer consumer = null;
public RMQNormalConsumer(String nsAddr, String topic, String subExpression,
@@ -33,18 +34,22 @@ public class RMQNormalConsumer extends AbstractMQConsumer {
super(nsAddr, topic, subExpression, consumerGroup, listener);
}
+ @Override
public AbstractListener getListener() {
return listener;
}
+ @Override
public void setListener(AbstractListener listener) {
this.listener = listener;
}
+ @Override
public void create() {
create(false);
}
+ @Override
public void create(boolean useTLS) {
consumer = new DefaultMQPushConsumer(consumerGroup);
consumer.setInstanceName(RandomUtil.getStringByUUID());
@@ -53,19 +58,20 @@ public class RMQNormalConsumer extends AbstractMQConsumer {
try {
consumer.subscribe(topic, subExpression);
} catch (MQClientException e) {
- logger.error("consumer subscribe failed!");
+ LOGGER.error("consumer subscribe failed!");
e.printStackTrace();
}
consumer.setMessageListener(listener);
consumer.setUseTLS(useTLS);
}
+ @Override
public void start() {
try {
consumer.start();
- logger.info(String.format("consumer[%s] started!",
consumer.getConsumerGroup()));
+ LOGGER.info(String.format("consumer[%s] started!",
consumer.getConsumerGroup()));
} catch (MQClientException e) {
- logger.error("consumer start failed!");
+ LOGGER.error("consumer start failed!");
e.printStackTrace();
}
}
@@ -74,11 +80,12 @@ public class RMQNormalConsumer extends AbstractMQConsumer {
try {
consumer.subscribe(topic, subExpression);
} catch (MQClientException e) {
- logger.error("consumer subscribe failed!");
+ LOGGER.error("consumer subscribe failed!");
e.printStackTrace();
}
}
+ @Override
public void shutdown() {
consumer.shutdown();
}
diff --git
a/test/src/main/java/org/apache/rocketmq/test/client/rmq/RMQPopClient.java
b/test/src/main/java/org/apache/rocketmq/test/client/rmq/RMQPopClient.java
index 558acb804..c502529ba 100644
--- a/test/src/main/java/org/apache/rocketmq/test/client/rmq/RMQPopClient.java
+++ b/test/src/main/java/org/apache/rocketmq/test/client/rmq/RMQPopClient.java
@@ -35,7 +35,9 @@ import org.apache.rocketmq.test.clientinterface.MQConsumer;
import org.apache.rocketmq.test.util.RandomUtil;
public class RMQPopClient implements MQConsumer {
+
private static final long DEFAULT_TIMEOUT = 3000;
+
private MQClientAPIImpl mqClientAPI;
@Override
@@ -50,10 +52,8 @@ public class RMQPopClient implements MQConsumer {
NettyClientConfig nettyClientConfig = new NettyClientConfig();
nettyClientConfig.setUseTLS(useTLS);
- this.mqClientAPI = new MQClientAPIImpl(nettyClientConfig,
- new ClientRemotingProcessor(null),
- null,
- clientConfig);
+ this.mqClientAPI = new MQClientAPIImpl(
+ nettyClientConfig, new ClientRemotingProcessor(null), null,
clientConfig);
}
@Override
@@ -103,8 +103,9 @@ public class RMQPopClient implements MQConsumer {
return future;
}
- public CompletableFuture<AckResult> ackMessageAsync(String brokerAddr,
String topic, String consumerGroup,
- String extraInfo) {
+ public CompletableFuture<AckResult> ackMessageAsync(
+ String brokerAddr, String topic, String consumerGroup, String
extraInfo) {
+
String[] extraInfoStrs = ExtraInfoUtil.split(extraInfo);
AckMessageRequestHeader requestHeader = new AckMessageRequestHeader();
requestHeader.setTopic(ExtraInfoUtil.getRealTopic(extraInfoStrs,
topic, consumerGroup));
diff --git
a/test/src/main/java/org/apache/rocketmq/test/client/rmq/RMQPopConsumer.java
b/test/src/main/java/org/apache/rocketmq/test/client/rmq/RMQPopConsumer.java
index 036f60e1d..49a06bb76 100644
--- a/test/src/main/java/org/apache/rocketmq/test/client/rmq/RMQPopConsumer.java
+++ b/test/src/main/java/org/apache/rocketmq/test/client/rmq/RMQPopConsumer.java
@@ -17,17 +17,84 @@
package org.apache.rocketmq.test.client.rmq;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeoutException;
+import org.apache.log4j.Logger;
+import org.apache.rocketmq.client.consumer.AckResult;
+import org.apache.rocketmq.client.consumer.PopResult;
+import org.apache.rocketmq.client.exception.MQBrokerException;
+import org.apache.rocketmq.client.exception.MQClientException;
+import org.apache.rocketmq.common.constant.ConsumeInitMode;
+import org.apache.rocketmq.common.filter.ExpressionType;
+import org.apache.rocketmq.common.message.MessageQueue;
+import org.apache.rocketmq.remoting.exception.RemotingException;
+import org.apache.rocketmq.test.factory.ConsumerFactory;
import org.apache.rocketmq.test.listener.AbstractListener;
public class RMQPopConsumer extends RMQNormalConsumer {
+
+ private static final Logger log = Logger.getLogger(RMQPopConsumer.class);
+
+ public static final long POP_TIMEOUT = 3000;
+ public static final long DEFAULT_INVISIBLE_TIME = 30000;
+
+ private RMQPopClient client;
+
+ private int maxNum = 16;
+
public RMQPopConsumer(String nsAddr, String topic, String subExpression,
- String consumerGroup, AbstractListener listner) {
- super(nsAddr, topic, subExpression, consumerGroup, listner);
+ String consumerGroup, AbstractListener listener) {
+ super(nsAddr, topic, subExpression, consumerGroup, listener);
+ }
+
+ public RMQPopConsumer(String nsAddr, String topic, String subExpression,
+ String consumerGroup, AbstractListener listener, int maxNum) {
+ super(nsAddr, topic, subExpression, consumerGroup, listener);
+ this.maxNum = maxNum;
+ }
+
+ @Override
+ public void start() {
+ client = ConsumerFactory.getRMQPopClient();
+ log.info(String.format("consumer[%s] started!", consumerGroup));
}
@Override
- public void create() {
- super.create();
- consumer.setClientRebalance(false);
+ public void shutdown() {
+ client.shutdown();
+ }
+
+ public PopResult pop(String brokerAddr, MessageQueue mq) throws Exception {
+ return this.pop(brokerAddr, mq, DEFAULT_INVISIBLE_TIME, 5000);
+ }
+
+ public PopResult pop(String brokerAddr, MessageQueue mq, long
invisibleTime, long timeout)
+ throws InterruptedException, RemotingException, MQClientException,
MQBrokerException,
+ ExecutionException, TimeoutException {
+
+ CompletableFuture<PopResult> future = this.client.popMessageAsync(
+ brokerAddr, mq, invisibleTime, maxNum, consumerGroup, timeout,
true,
+ ConsumeInitMode.MIN, false, ExpressionType.TAG, "*");
+
+ return future.get();
+ }
+
+ public PopResult popOrderly(String brokerAddr, MessageQueue mq) throws
Exception {
+ return this.popOrderly(brokerAddr, mq, DEFAULT_INVISIBLE_TIME, 5000);
+ }
+
+ public PopResult popOrderly(String brokerAddr, MessageQueue mq, long
invisibleTime, long timeout)
+ throws InterruptedException, ExecutionException {
+
+ CompletableFuture<PopResult> future = this.client.popMessageAsync(
+ brokerAddr, mq, invisibleTime, maxNum, consumerGroup, timeout,
true,
+ ConsumeInitMode.MIN, true, ExpressionType.TAG, "*");
+
+ return future.get();
+ }
+
+ public CompletableFuture<AckResult> ackAsync(String brokerAddr, String
extraInfo) {
+ return this.client.ackMessageAsync(brokerAddr, topic, consumerGroup,
extraInfo);
}
}
diff --git
a/test/src/main/java/org/apache/rocketmq/test/factory/ConsumerFactory.java
b/test/src/main/java/org/apache/rocketmq/test/factory/ConsumerFactory.java
index 27f5dcbdd..cdda908f6 100644
--- a/test/src/main/java/org/apache/rocketmq/test/factory/ConsumerFactory.java
+++ b/test/src/main/java/org/apache/rocketmq/test/factory/ConsumerFactory.java
@@ -64,11 +64,11 @@ public class ConsumerFactory {
consumer.start();
return consumer;
}
+
public static RMQPopConsumer getRMQPopConsumer(String nsAddr, String
consumerGroup,
- String topic, String subExpression,
- AbstractListener listener) {
- RMQPopConsumer consumer = new RMQPopConsumer(nsAddr, topic,
subExpression,
- consumerGroup, listener);
+ String topic, String subExpression, AbstractListener listener) {
+
+ RMQPopConsumer consumer = new RMQPopConsumer(nsAddr, topic,
subExpression, consumerGroup, listener);
consumer.create();
consumer.start();
return consumer;
diff --git
a/test/src/main/java/org/apache/rocketmq/test/util/MQAdminTestUtils.java
b/test/src/main/java/org/apache/rocketmq/test/util/MQAdminTestUtils.java
index c4d50f737..5f9f7a26e 100644
--- a/test/src/main/java/org/apache/rocketmq/test/util/MQAdminTestUtils.java
+++ b/test/src/main/java/org/apache/rocketmq/test/util/MQAdminTestUtils.java
@@ -77,7 +77,7 @@ public class MQAdminTestUtils {
return true;
}
- private static boolean checkTopicExist(DefaultMQAdminExt mqAdminExt,
String topic) {
+ public static boolean checkTopicExist(DefaultMQAdminExt mqAdminExt, String
topic) {
boolean createResult = false;
try {
TopicStatsTable topicInfo = mqAdminExt.examineTopicStats(topic);
diff --git
a/test/src/test/java/org/apache/rocketmq/test/offset/OffsetResetForPopIT.java
b/test/src/test/java/org/apache/rocketmq/test/offset/OffsetResetForPopIT.java
new file mode 100644
index 000000000..cf17d6260
--- /dev/null
+++
b/test/src/test/java/org/apache/rocketmq/test/offset/OffsetResetForPopIT.java
@@ -0,0 +1,355 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.test.offset;
+
+import com.google.common.collect.Lists;
+import java.util.Collections;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import org.apache.log4j.Logger;
+import org.apache.rocketmq.client.consumer.PopResult;
+import org.apache.rocketmq.common.TopicConfig;
+import org.apache.rocketmq.common.admin.ConsumeStats;
+import org.apache.rocketmq.common.message.Message;
+import org.apache.rocketmq.common.message.MessageConst;
+import org.apache.rocketmq.common.message.MessageExt;
+import org.apache.rocketmq.common.message.MessageQueue;
+import org.apache.rocketmq.test.base.BaseConf;
+import org.apache.rocketmq.test.client.rmq.RMQNormalProducer;
+import org.apache.rocketmq.test.client.rmq.RMQPopConsumer;
+import org.apache.rocketmq.test.listener.rmq.concurrent.RMQNormalListener;
+import org.apache.rocketmq.test.util.MQAdminTestUtils;
+import org.apache.rocketmq.test.util.MQRandomUtils;
+import org.apache.rocketmq.tools.admin.DefaultMQAdminExt;
+import org.apache.rocketmq.tools.command.CommandUtil;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import static org.awaitility.Awaitility.await;
+
+public class OffsetResetForPopIT extends BaseConf {
+
+ private static final Logger LOGGER =
Logger.getLogger(OffsetResetForPopIT.class);
+
+ private String topic;
+ private String group;
+ private RMQNormalProducer producer = null;
+ private RMQPopConsumer consumer = null;
+ private DefaultMQAdminExt adminExt;
+
+ @Before
+ public void setUp() throws Exception {
+ // reset pop offset rely on server side offset
+ brokerController1.getBrokerConfig().setUseServerSideResetOffset(true);
+
+ adminExt = BaseConf.getAdmin(NAMESRV_ADDR);
+ adminExt.start();
+
+ topic = MQRandomUtils.getRandomTopic();
+ this.createAndWaitTopicRegister(BROKER1_NAME, topic);
+ group = initConsumerGroup();
+ LOGGER.info(String.format("use topic: %s, group: %s", topic, group));
+ producer = getProducer(NAMESRV_ADDR, topic);
+ }
+
+ @After
+ public void tearDown() {
+ shutdown();
+ }
+
+ private void createAndWaitTopicRegister(String brokerName, String topic)
throws Exception {
+ String brokerAddress =
CommandUtil.fetchMasterAddrByBrokerName(adminExt, brokerName);
+ TopicConfig topicConfig = new TopicConfig(topic);
+ topicConfig.setReadQueueNums(1);
+ topicConfig.setWriteQueueNums(1);
+ adminExt.createAndUpdateTopicConfig(brokerAddress, topicConfig);
+
+ await().atMost(30, TimeUnit.SECONDS).until(
+ () -> MQAdminTestUtils.checkTopicExist(adminExt, topic));
+ }
+
+ private void resetOffsetInner(long resetOffset) {
+ try {
+ // reset offset by queue
+ adminExt.resetOffsetByQueueId(brokerController1.getBrokerAddr(),
+ consumer.getConsumerGroup(), consumer.getTopic(), 0,
resetOffset);
+ } catch (Exception ignore) {
+ }
+ }
+
+ private void ackMessageSync(MessageExt messageExt) {
+ try {
+ consumer.ackAsync(brokerController1.getBrokerAddr(),
+ messageExt.getProperty(MessageConst.PROPERTY_POP_CK)).get();
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ }
+
+ private void ackMessageSync(List<MessageExt> messageExtList) {
+ if (messageExtList != null) {
+ messageExtList.forEach(this::ackMessageSync);
+ }
+ }
+
+ @Test
+ public void testResetOffsetAfterPop() throws Exception {
+ int messageCount = 10;
+ int resetOffset = 4;
+ producer.send(messageCount);
+ consumer = new RMQPopConsumer(NAMESRV_ADDR, topic, "*", group, new
RMQNormalListener());
+ consumer.start();
+
+ MessageQueue mq = new MessageQueue(topic, BROKER1_NAME, 0);
+ PopResult popResult = consumer.pop(brokerController1.getBrokerAddr(),
mq);
+ Assert.assertEquals(10, popResult.getMsgFoundList().size());
+
+ resetOffsetInner(resetOffset);
+ popResult = consumer.pop(brokerController1.getBrokerAddr(), mq);
+ Assert.assertTrue(popResult != null && popResult.getMsgFoundList() !=
null);
+ Assert.assertEquals(messageCount - resetOffset,
popResult.getMsgFoundList().size());
+ }
+
+ @Test
+ public void testResetOffsetThenAckOldForPopOrderly() throws Exception {
+ int messageCount = 10;
+ int resetOffset = 2;
+ producer.send(messageCount);
+ consumer = new RMQPopConsumer(NAMESRV_ADDR, topic, "*", group, new
RMQNormalListener());
+ consumer.start();
+
+ MessageQueue mq = new MessageQueue(topic, BROKER1_NAME, 0);
+ PopResult popResult1 =
consumer.popOrderly(brokerController1.getBrokerAddr(), mq);
+ Assert.assertEquals(10, popResult1.getMsgFoundList().size());
+
+ resetOffsetInner(resetOffset);
+ ConsumeStats consumeStats = adminExt.examineConsumeStats(group, topic);
+ Assert.assertEquals(resetOffset,
consumeStats.getOffsetTable().get(mq).getConsumerOffset());
+
+ PopResult popResult2 =
consumer.popOrderly(brokerController1.getBrokerAddr(), mq);
+ Assert.assertTrue(popResult2 != null && popResult2.getMsgFoundList()
!= null);
+ Assert.assertEquals(messageCount - resetOffset,
popResult2.getMsgFoundList().size());
+
+ // ack old msg, expect has no effect
+ ackMessageSync(popResult1.getMsgFoundList());
+ Assert.assertTrue(brokerController1.getConsumerOrderInfoManager()
+ .checkBlock(topic, group, 0,
RMQPopConsumer.DEFAULT_INVISIBLE_TIME));
+
+ // ack new msg
+ ackMessageSync(popResult2.getMsgFoundList());
+ Assert.assertFalse(brokerController1.getConsumerOrderInfoManager()
+ .checkBlock(topic, group, 0,
RMQPopConsumer.DEFAULT_INVISIBLE_TIME));
+ }
+
+ @Test
+ public void testRestOffsetToSkipMsgForPopOrderly() throws Exception {
+ int messageCount = 10;
+ int resetOffset = 4;
+ producer.send(messageCount);
+ consumer = new RMQPopConsumer(NAMESRV_ADDR, topic, "*", group, new
RMQNormalListener());
+ resetOffsetInner(resetOffset);
+ consumer.start();
+
+ MessageQueue mq = new MessageQueue(topic, BROKER1_NAME, 0);
+ PopResult popResult =
consumer.popOrderly(brokerController1.getBrokerAddr(), mq);
+ Assert.assertEquals(messageCount - resetOffset,
popResult.getMsgFoundList().size());
+ Assert.assertTrue(brokerController1.getConsumerOrderInfoManager()
+ .checkBlock(topic, group, 0,
RMQPopConsumer.DEFAULT_INVISIBLE_TIME));
+
+ ackMessageSync(popResult.getMsgFoundList());
+ TimeUnit.SECONDS.sleep(1);
+ Assert.assertFalse(brokerController1.getConsumerOrderInfoManager()
+ .checkBlock(topic, group, 0,
RMQPopConsumer.DEFAULT_INVISIBLE_TIME));
+ }
+
+ @Test
+ public void testResetOffsetAfterPopWhenOpenBufferAndWait() throws
Exception {
+ int messageCount = 10;
+ int resetOffset = 4;
+ brokerController1.getBrokerConfig().setEnablePopBufferMerge(true);
+ producer.send(messageCount);
+ consumer = new RMQPopConsumer(NAMESRV_ADDR, topic, "*", group, new
RMQNormalListener());
+ consumer.start();
+
+ MessageQueue mq = new MessageQueue(topic, BROKER1_NAME, 0);
+ PopResult popResult = consumer.pop(brokerController1.getBrokerAddr(),
mq);
+ Assert.assertEquals(10, popResult.getMsgFoundList().size());
+
+ resetOffsetInner(resetOffset);
+
TimeUnit.MILLISECONDS.sleep(brokerController1.getBrokerConfig().getPopCkStayBufferTimeOut());
+
+ popResult = consumer.pop(brokerController1.getBrokerAddr(), mq);
+ Assert.assertTrue(popResult != null && popResult.getMsgFoundList() !=
null);
+ Assert.assertEquals(messageCount - resetOffset,
popResult.getMsgFoundList().size());
+ }
+
+ @Test
+ public void testResetOffsetWhilePopWhenOpenBuffer() {
+ testResetOffsetWhilePop(8, false, false, 5);
+ }
+
+ @Test
+ public void testResetOffsetWhilePopWhenOpenBufferAndAck() {
+ testResetOffsetWhilePop(8, false, true, 5);
+ }
+
+ @Test
+ public void testMultipleResetOffsetWhilePopWhenOpenBufferAndAck() {
+ testResetOffsetWhilePop(8, false, true, 3, 5);
+ }
+
+ @Test
+ public void testResetFutureOffsetWhilePopWhenOpenBufferAndAck() {
+ testResetOffsetWhilePop(2, true, true, 8);
+ }
+
+ @Test
+ public void testMultipleResetFutureOffsetWhilePopWhenOpenBufferAndAck() {
+ testResetOffsetWhilePop(2, true, true, 5, 8);
+ }
+
+ private void testResetOffsetWhilePop(int targetCount, boolean resetFuture,
boolean needAck,
+ int... resetOffset) {
+ brokerController1.getBrokerConfig().setEnablePopBufferMerge(true);
+ producer.send(10);
+
+ // max pop one message per request
+ consumer =
+ new RMQPopConsumer(NAMESRV_ADDR, topic, "*", group, new
RMQNormalListener(), 1);
+
+ MessageQueue mq = new MessageQueue(topic, BROKER1_NAME, 0);
+ AtomicInteger counter = new AtomicInteger(0);
+ consumer.start();
+ Executors.newSingleThreadScheduledExecutor().execute(() -> {
+ long start = System.currentTimeMillis();
+ while (System.currentTimeMillis() - start <= 30 * 1000L) {
+ try {
+ PopResult popResult =
consumer.pop(brokerController1.getBrokerAddr(), mq);
+ if (popResult == null || popResult.getMsgFoundList() ==
null) {
+ continue;
+ }
+
+ int count =
counter.addAndGet(popResult.getMsgFoundList().size());
+ if (needAck) {
+ ackMessageSync(popResult.getMsgFoundList());
+ }
+ if (count == targetCount) {
+ for (int offset : resetOffset) {
+ resetOffsetInner(offset);
+ }
+ }
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ }
+ });
+
+ await().atMost(10, TimeUnit.SECONDS).until(() -> {
+ boolean result = true;
+ if (resetFuture) {
+ result = counter.get() < 10;
+ }
+ result &= counter.get() >= targetCount + 10 -
resetOffset[resetOffset.length - 1];
+ return result;
+ });
+ }
+
+ @Test
+ public void testResetFutureOffsetWhilePopOrderlyAndAck() {
+ testResetOffsetWhilePopOrderly(1,
+ Lists.newArrayList(0, 5, 6, 7, 8, 9), Lists.newArrayList(5), 6);
+ }
+
+ @Test
+ public void testMultipleResetFutureOffsetWhilePopOrderlyAndAck() {
+ testResetOffsetWhilePopOrderly(1,
+ Lists.newArrayList(0, 5, 6, 7, 8, 9), Lists.newArrayList(3, 5), 6);
+ }
+
+ @Test
+ public void testResetOffsetWhilePopOrderlyAndAck() {
+ testResetOffsetWhilePopOrderly(5,
+ Lists.newArrayList(0, 1, 2, 3, 4, 5, 6, 7, 8, 9),
+ Lists.newArrayList(3), 12);
+ }
+
+ @Test
+ public void testMultipleResetOffsetWhilePopOrderlyAndAck() {
+ testResetOffsetWhilePopOrderly(5,
+ Lists.newArrayList(0, 1, 2, 3, 4, 5, 6, 7, 8, 9),
+ Lists.newArrayList(3, 1), 14);
+ }
+
+ private void testResetOffsetWhilePopOrderly(int targetCount, List<Integer>
expectMsgReceive,
+ List<Integer> resetOffset, int expectCount) {
+ brokerController1.getBrokerConfig().setEnablePopBufferMerge(true);
+ for (int i = 0; i < 10; i++) {
+ Message msg = new Message(topic, (String.valueOf(i)).getBytes());
+ producer.send(msg);
+ }
+ consumer = new RMQPopConsumer(NAMESRV_ADDR, topic, "*", group, new
RMQNormalListener(), 1);
+ MessageQueue mq = new MessageQueue(topic, BROKER1_NAME, 0);
+ Set<Integer> msgReceive = Collections.newSetFromMap(new
ConcurrentHashMap<>());
+ AtomicInteger counter = new AtomicInteger(0);
+ consumer.start();
+
+ Executors.newSingleThreadScheduledExecutor().execute(() -> {
+ long start = System.currentTimeMillis();
+ while (System.currentTimeMillis() - start <= 30 * 1000L) {
+ try {
+ PopResult popResult =
consumer.popOrderly(brokerController1.getBrokerAddr(), mq);
+ if (popResult == null || popResult.getMsgFoundList() ==
null) {
+ continue;
+ }
+ int count =
counter.addAndGet(popResult.getMsgFoundList().size());
+ for (MessageExt messageExt : popResult.getMsgFoundList()) {
+ msgReceive.add(Integer.valueOf(new
String(messageExt.getBody())));
+ ackMessageSync(messageExt);
+ }
+ if (count == targetCount) {
+ for (int offset : resetOffset) {
+ resetOffsetInner(offset);
+ }
+ }
+ } catch (Exception e) {
+ // do nothing;
+ }
+ }
+ });
+
+ await().atMost(10, TimeUnit.SECONDS).until(() -> {
+ boolean result = true;
+ if (expectMsgReceive.size() != msgReceive.size()) {
+ return false;
+ }
+ if (counter.get() != expectCount) {
+ return false;
+ }
+ for (Integer expectMsg : expectMsgReceive) {
+ result &= msgReceive.contains(expectMsg);
+ }
+ return result;
+ });
+ }
+}
diff --git
a/tools/src/main/java/org/apache/rocketmq/tools/command/CommandUtil.java
b/tools/src/main/java/org/apache/rocketmq/tools/command/CommandUtil.java
index a6b612070..702918196 100644
--- a/tools/src/main/java/org/apache/rocketmq/tools/command/CommandUtil.java
+++ b/tools/src/main/java/org/apache/rocketmq/tools/command/CommandUtil.java
@@ -134,6 +134,19 @@ public class CommandUtil {
return brokerAddressSet;
}
+ public static String fetchMasterAddrByBrokerName(final MQAdminExt adminExt,
+ final String brokerName) throws Exception {
+ ClusterInfo clusterInfoSerializeWrapper =
adminExt.examineBrokerClusterInfo();
+ BrokerData brokerData =
clusterInfoSerializeWrapper.getBrokerAddrTable().get(brokerName);
+ if (null != brokerData) {
+ String addr = brokerData.getBrokerAddrs().get(MixAll.MASTER_ID);
+ if (addr != null) {
+ return addr;
+ }
+ }
+ throw new Exception(String.format("No broker address for broker name
%s.%n", brokerData));
+ }
+
public static Set<String> fetchMasterAndSlaveAddrByBrokerName(final
MQAdminExt adminExt, final String brokerName)
throws InterruptedException, RemotingConnectException,
RemotingTimeoutException,
RemotingSendRequestException, MQBrokerException {