This is an automated email from the ASF dual-hosted git repository.
lizhanhui pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git
The following commit(s) were added to refs/heads/develop by this push:
new d663bcf6a [ISSUE #5292] [RIP-48] Support reset offset in server-side
to improve the success rate (#5293)
d663bcf6a is described below
commit d663bcf6aa9e171f079e9bd85d65b1d907f4383b
Author: lizhimins <[email protected]>
AuthorDate: Fri Oct 14 14:02:08 2022 +0800
[ISSUE #5292] [RIP-48] Support reset offset in server-side to improve the
success rate (#5293)
* [RIP-48] Support reset offset in server side to improve the success rate
* [RIP-48] Support reset offset in server side to improve the success rate
* [RIP-48] Fix code style
* fix unit test
* no need put offset in reset command header
Co-authored-by: 斜阳 <[email protected]>
---
.../broker/offset/ConsumerOffsetManager.java | 69 +++++++-
.../broker/offset/LmqConsumerOffsetManager.java | 2 +-
.../broker/processor/AdminBrokerProcessor.java | 103 ++++++++++++
.../broker/processor/ConsumerManageProcessor.java | 57 +++++--
.../broker/processor/PullMessageProcessor.java | 32 +++-
.../subscription/SubscriptionGroupManager.java | 10 ++
.../broker/offset/ConsumerOffsetManagerTest.java | 33 +++-
.../rocketmq/client/impl/MQClientAPIImpl.java | 36 +++++
.../org/apache/rocketmq/common/BrokerConfig.java | 10 ++
.../common/protocol/body/ResetOffsetBody.java | 6 +
.../protocol/header/ResetOffsetRequestHeader.java | 24 +++
.../apache/rocketmq/store/GetMessageStatus.java | 2 +
.../rocketmq/test/listener/AbstractListener.java | 41 ++---
.../listener/rmq/concurrent/RMQNormalListener.java | 9 +-
.../org/apache/rocketmq/test/base/BaseConf.java | 17 +-
.../apache/rocketmq/test/offset/OffsetResetIT.java | 174 +++++++++++++++++++++
.../tools/admin/DefaultMQAdminExtImpl.java | 13 ++
.../command/offset/ResetOffsetByTimeCommand.java | 61 ++++----
.../offset/ResetOffsetByTimeOldCommand.java | 18 ++-
19 files changed, 625 insertions(+), 92 deletions(-)
diff --git
a/broker/src/main/java/org/apache/rocketmq/broker/offset/ConsumerOffsetManager.java
b/broker/src/main/java/org/apache/rocketmq/broker/offset/ConsumerOffsetManager.java
index 02509f60d..5522d232c 100644
---
a/broker/src/main/java/org/apache/rocketmq/broker/offset/ConsumerOffsetManager.java
+++
b/broker/src/main/java/org/apache/rocketmq/broker/offset/ConsumerOffsetManager.java
@@ -16,6 +16,7 @@
*/
package org.apache.rocketmq.broker.offset;
+import com.google.common.base.Strings;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
@@ -41,12 +42,15 @@ public class ConsumerOffsetManager extends ConfigManager {
private DataVersion dataVersion = new DataVersion();
- protected ConcurrentMap<String/* topic@group */, ConcurrentMap<Integer,
Long>> offsetTable =
+ private ConcurrentMap<String/* topic@group */, ConcurrentMap<Integer,
Long>> offsetTable =
+ new ConcurrentHashMap<>(512);
+
+ private final ConcurrentMap<String, ConcurrentMap<Integer, Long>>
resetOffsetTable =
new ConcurrentHashMap<>(512);
protected transient BrokerController brokerController;
- private transient AtomicLong versionChangeCounter = new AtomicLong(0);
+ private final transient AtomicLong versionChangeCounter = new
AtomicLong(0);
public ConsumerOffsetManager() {
}
@@ -204,6 +208,14 @@ public class ConsumerOffsetManager extends ConfigManager {
public long queryOffset(final String group, final String topic, final int
queueId) {
// topic@group
String key = topic + TOPIC_GROUP_SEPARATOR + group;
+
+ if
(this.brokerController.getBrokerConfig().isUseServerSideResetOffset()) {
+ Map<Integer, Long> reset = resetOffsetTable.get(key);
+ if (null != reset && reset.containsKey(queueId)) {
+ return reset.get(queueId);
+ }
+ }
+
ConcurrentMap<Integer, Long> map = this.offsetTable.get(key);
if (null != map) {
Long offset = map.get(queueId);
@@ -215,6 +227,7 @@ public class ConsumerOffsetManager extends ConfigManager {
return -1;
}
+ @Override
public String encode() {
return this.encode(false);
}
@@ -229,7 +242,7 @@ public class ConsumerOffsetManager extends ConfigManager {
if (jsonString != null) {
ConsumerOffsetManager obj =
RemotingSerializable.fromJson(jsonString, ConsumerOffsetManager.class);
if (obj != null) {
- this.offsetTable = obj.offsetTable;
+ this.setOffsetTable(obj.getOffsetTable());
this.dataVersion = obj.dataVersion;
}
}
@@ -244,7 +257,7 @@ public class ConsumerOffsetManager extends ConfigManager {
return offsetTable;
}
- public void setOffsetTable(ConcurrentHashMap<String,
ConcurrentMap<Integer, Long>> offsetTable) {
+ public void setOffsetTable(ConcurrentMap<String, ConcurrentMap<Integer,
Long>> offsetTable) {
this.offsetTable = offsetTable;
}
@@ -318,7 +331,55 @@ public class ConsumerOffsetManager extends ConfigManager {
}
}
}
+ }
+
+ public void assignResetOffset(String topic, String group, int queueId,
long offset) {
+ if (Strings.isNullOrEmpty(topic) || Strings.isNullOrEmpty(group) ||
queueId < 0 || offset < 0) {
+ LOG.warn("Illegal arguments when assigning reset offset. Topic={},
group={}, queueId={}, offset={}",
+ topic, group, queueId, offset);
+ return;
+ }
+
+ String key = topic + TOPIC_GROUP_SEPARATOR + group;
+ ConcurrentMap<Integer, Long> map = resetOffsetTable.get(key);
+ if (null == map) {
+ map = new ConcurrentHashMap<Integer, Long>();
+ ConcurrentMap<Integer, Long> previous =
resetOffsetTable.putIfAbsent(key, map);
+ if (null != previous) {
+ map = previous;
+ }
+ }
+ map.put(queueId, offset);
+ LOG.debug("Reset offset OK. Topic={}, group={}, queueId={},
resetOffset={}",
+ topic, group, queueId, offset);
+
+ // Two things are important here:
+ // 1, currentOffsetMap might be null if there is no previous records;
+ // 2, Our overriding here may get overridden by the client instantly
in concurrent cases; But it still makes
+ // sense in cases like clients are offline.
+ ConcurrentMap<Integer, Long> currentOffsetMap = offsetTable.get(key);
+ if (null != currentOffsetMap) {
+ currentOffsetMap.put(queueId, offset);
+ }
}
+ public boolean hasOffsetReset(String topic, String group, int queueId) {
+ String key = topic + TOPIC_GROUP_SEPARATOR + group;
+ ConcurrentMap<Integer, Long> map = resetOffsetTable.get(key);
+ if (null == map) {
+ return false;
+ }
+ return map.containsKey(queueId);
+ }
+
+ public Long queryThenEraseResetOffset(String topic, String group, Integer
queueId) {
+ String key = topic + TOPIC_GROUP_SEPARATOR + group;
+ ConcurrentMap<Integer, Long> map = resetOffsetTable.get(key);
+ if (null == map) {
+ return null;
+ } else {
+ return map.remove(queueId);
+ }
+ }
}
diff --git
a/broker/src/main/java/org/apache/rocketmq/broker/offset/LmqConsumerOffsetManager.java
b/broker/src/main/java/org/apache/rocketmq/broker/offset/LmqConsumerOffsetManager.java
index ec730d38b..ce70b1a82 100644
---
a/broker/src/main/java/org/apache/rocketmq/broker/offset/LmqConsumerOffsetManager.java
+++
b/broker/src/main/java/org/apache/rocketmq/broker/offset/LmqConsumerOffsetManager.java
@@ -92,7 +92,7 @@ public class LmqConsumerOffsetManager extends
ConsumerOffsetManager {
if (jsonString != null) {
LmqConsumerOffsetManager obj =
RemotingSerializable.fromJson(jsonString, LmqConsumerOffsetManager.class);
if (obj != null) {
- super.offsetTable = obj.offsetTable;
+ super.setOffsetTable(obj.getOffsetTable());
this.lmqOffsetTable = obj.lmqOffsetTable;
}
}
diff --git
a/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java
b/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java
index b7ee62c9b..a165add40 100644
---
a/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java
+++
b/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java
@@ -48,6 +48,7 @@ import org.apache.rocketmq.broker.controller.ReplicasManager;
import org.apache.rocketmq.broker.plugin.BrokerAttachedPlugin;
import org.apache.rocketmq.broker.subscription.SubscriptionGroupManager;
import org.apache.rocketmq.common.protocol.body.ProducerTableInfo;
+import org.apache.rocketmq.common.protocol.body.ResetOffsetBody;
import
org.apache.rocketmq.common.protocol.header.GetAllProducerInfoRequestHeader;
import org.apache.rocketmq.common.topic.TopicValidator;
import org.apache.rocketmq.broker.transaction.queue.TransactionalMessageUtil;
@@ -1633,6 +1634,16 @@ public class AdminBrokerProcessor implements
NettyRequestProcessor {
LOGGER.info("[reset-offset] reset offset started by {}. topic={},
group={}, timestamp={}, isForce={}",
RemotingHelper.parseChannelRemoteAddr(ctx.channel()),
requestHeader.getTopic(), requestHeader.getGroup(),
requestHeader.getTimestamp(), requestHeader.isForce());
+
+ if
(this.brokerController.getBrokerConfig().isUseServerSideResetOffset()) {
+ String topic = requestHeader.getTopic();
+ String group = requestHeader.getGroup();
+ int queueId = requestHeader.getQueueId();
+ long timestamp = requestHeader.getTimestamp();
+ Long offset = requestHeader.getOffset();
+ return resetOffsetInner(topic, group, queueId, timestamp, offset);
+ }
+
boolean isC = false;
LanguageCode language = request.getLanguage();
switch (language) {
@@ -1644,6 +1655,98 @@ public class AdminBrokerProcessor implements
NettyRequestProcessor {
requestHeader.getTimestamp(), requestHeader.isForce(), isC);
}
+ private Long searchOffsetByTimestamp(String topic, int queueId, long
timestamp) {
+ if (timestamp < 0) {
+ return
brokerController.getMessageStore().getMaxOffsetInQueue(topic, queueId);
+ } else {
+ return
brokerController.getMessageStore().getOffsetInQueueByTime(topic, queueId,
timestamp);
+ }
+ }
+
+ /**
+ * Reset consumer offset.
+ *
+ * @param topic Required, not null.
+ * @param group Required, not null.
+ * @param queueId if target queue ID is negative, all message queues
will be reset;
+ * otherwise, only the target queue would get reset.
+ * @param timestamp if timestamp is negative, offset would be reset to
broker offset at the time being;
+ * otherwise, binary search is performed to locate
target offset.
+ * @param offset Target offset to reset to if target queue ID is
properly provided.
+ * @return Affected queues and their new offset
+ */
+ private RemotingCommand resetOffsetInner(String topic, String group, int
queueId, long timestamp, Long offset) {
+ RemotingCommand response =
RemotingCommand.createResponseCommand(ResponseCode.SUCCESS, null);
+
+ if (BrokerRole.SLAVE ==
brokerController.getMessageStoreConfig().getBrokerRole()) {
+ response.setCode(ResponseCode.SYSTEM_ERROR);
+ response.setRemark("Can not reset offset in slave broker");
+ return response;
+ }
+
+ Map<Integer, Long> queueOffsetMap = new HashMap<>();
+
+ // Reset offset for all queues belonging to the specified topic
+ TopicConfig topicConfig =
brokerController.getTopicConfigManager().getTopicConfigTable().get(topic);
+ if (null == topicConfig) {
+ response.setCode(ResponseCode.TOPIC_NOT_EXIST);
+ response.setRemark("Topic " + topic + " does not exist");
+ LOGGER.warn("Reset offset failed, topic does not exist. topic={},
group={}", topic, group);
+ return response;
+ }
+
+ if
(!brokerController.getSubscriptionGroupManager().containsSubscriptionGroup(group))
{
+ response.setCode(ResponseCode.SUBSCRIPTION_GROUP_NOT_EXIST);
+ response.setRemark("Group " + group + " does not exist");
+ LOGGER.warn("Reset offset failed, group does not exist. topic={},
group={}", topic, group);
+ return response;
+ }
+
+ if (queueId >= 0) {
+ if (null != offset && -1 != offset) {
+ long min =
brokerController.getMessageStore().getMinOffsetInQueue(topic, queueId);
+ long max =
brokerController.getMessageStore().getMaxOffsetInQueue(topic, queueId);
+ if (min >= 0 && offset < min || offset > max + 1) {
+ response.setCode(ResponseCode.SYSTEM_ERROR);
+ response.setRemark(
+ String.format("Target offset %d not in consume queue
range [%d-%d]", offset, min, max));
+ return response;
+ }
+ } else {
+ offset = searchOffsetByTimestamp(topic, queueId, timestamp);
+ }
+ queueOffsetMap.put(queueId, offset);
+ } else {
+ for (int index = 0; index < topicConfig.getReadQueueNums();
index++) {
+ offset = searchOffsetByTimestamp(topic, index, timestamp);
+ queueOffsetMap.put(index, offset);
+ }
+ }
+
+ if (queueOffsetMap.isEmpty()) {
+ response.setCode(ResponseCode.SYSTEM_ERROR);
+ response.setRemark("No queues to reset.");
+ LOGGER.warn("Reset offset aborted: no queues to reset");
+ return response;
+ }
+
+ for (Map.Entry<Integer, Long> entry : queueOffsetMap.entrySet()) {
+ brokerController.getConsumerOffsetManager()
+ .assignResetOffset(topic, group, entry.getKey(),
entry.getValue());
+ }
+
+ // Prepare reset result.
+ ResetOffsetBody body = new ResetOffsetBody();
+ String brokerName = brokerController.getBrokerConfig().getBrokerName();
+ for (Map.Entry<Integer, Long> entry : queueOffsetMap.entrySet()) {
+ body.getOffsetTable().put(new MessageQueue(topic, brokerName,
entry.getKey()), entry.getValue());
+ }
+
+ LOGGER.info("Reset offset, topic={}, group={}, queues={}", topic,
group, body.toJson(false));
+ response.setBody(body.encode());
+ return response;
+ }
+
public RemotingCommand getConsumerStatus(ChannelHandlerContext ctx,
RemotingCommand request) throws RemotingCommandException {
final GetConsumerStatusRequestHeader requestHeader =
diff --git
a/broker/src/main/java/org/apache/rocketmq/broker/processor/ConsumerManageProcessor.java
b/broker/src/main/java/org/apache/rocketmq/broker/processor/ConsumerManageProcessor.java
index 0959853cc..a9ceb32df 100644
---
a/broker/src/main/java/org/apache/rocketmq/broker/processor/ConsumerManageProcessor.java
+++
b/broker/src/main/java/org/apache/rocketmq/broker/processor/ConsumerManageProcessor.java
@@ -19,6 +19,7 @@ package org.apache.rocketmq.broker.processor;
import io.netty.channel.ChannelHandlerContext;
import org.apache.rocketmq.broker.BrokerController;
import org.apache.rocketmq.broker.client.ConsumerGroupInfo;
+import org.apache.rocketmq.broker.offset.ConsumerOffsetManager;
import org.apache.rocketmq.common.constant.LoggerName;
import org.apache.rocketmq.common.protocol.RequestCode;
import org.apache.rocketmq.common.protocol.ResponseCode;
@@ -143,27 +144,61 @@ public class ConsumerManageProcessor implements
NettyRequestProcessor {
private RemotingCommand updateConsumerOffset(ChannelHandlerContext ctx,
RemotingCommand request)
throws RemotingCommandException {
+
final RemotingCommand response =
RemotingCommand.createResponseCommand(UpdateConsumerOffsetResponseHeader.class);
+
final UpdateConsumerOffsetRequestHeader requestHeader =
- (UpdateConsumerOffsetRequestHeader) request
-
.decodeCommandCustomHeader(UpdateConsumerOffsetRequestHeader.class);
- TopicQueueMappingContext mappingContext =
this.brokerController.getTopicQueueMappingManager().buildTopicQueueMappingContext(requestHeader);
+ (UpdateConsumerOffsetRequestHeader)
+
request.decodeCommandCustomHeader(UpdateConsumerOffsetRequestHeader.class);
- RemotingCommand rewriteResult =
rewriteRequestForStaticTopic(requestHeader, mappingContext);
+ TopicQueueMappingContext mappingContext =
+
this.brokerController.getTopicQueueMappingManager().buildTopicQueueMappingContext(requestHeader);
+
+ RemotingCommand rewriteResult =
rewriteRequestForStaticTopic(requestHeader, mappingContext);
if (rewriteResult != null) {
return rewriteResult;
}
- if
(this.brokerController.getTopicConfigManager().containsTopic(requestHeader.getTopic()))
{
-
this.brokerController.getConsumerOffsetManager().commitOffset(RemotingHelper.parseChannelRemoteAddr(ctx.channel()),
requestHeader.getConsumerGroup(),
- requestHeader.getTopic(), requestHeader.getQueueId(),
requestHeader.getCommitOffset());
- response.setCode(ResponseCode.SUCCESS);
- response.setRemark(null);
- } else {
+
+ String topic = requestHeader.getTopic();
+ String group = requestHeader.getConsumerGroup();
+ Integer queueId = requestHeader.getQueueId();
+ Long offset = requestHeader.getCommitOffset();
+
+ if
(!this.brokerController.getTopicConfigManager().containsTopic(requestHeader.getTopic()))
{
response.setCode(ResponseCode.TOPIC_NOT_EXIST);
- response.setRemark("Topic " + requestHeader.getTopic() + " not
exist!");
+ response.setRemark("Topic " + topic + " not exist!");
+ return response;
+ }
+
+ if (queueId == null) {
+ response.setCode(ResponseCode.SYSTEM_ERROR);
+ response.setRemark("QueueId is null, topic is " + topic);
+ return response;
+ }
+
+ if (offset == null) {
+ response.setCode(ResponseCode.SYSTEM_ERROR);
+ response.setRemark("Offset is null, topic is " + topic);
+ return response;
+ }
+
+ ConsumerOffsetManager consumerOffsetManager =
brokerController.getConsumerOffsetManager();
+ if
(this.brokerController.getBrokerConfig().isUseServerSideResetOffset()) {
+ // Note, ignoring this update offset request
+ if (consumerOffsetManager.hasOffsetReset(topic, group, queueId)) {
+ response.setCode(ResponseCode.SUCCESS);
+ response.setRemark("Offset has been previously reset");
+ LOGGER.info("Update consumer offset is rejected because of
previous offset-reset. Group={}, " +
+ "Topic={}, QueueId={}, Offset={}", group, topic, queueId,
offset);
+ return response;
+ }
}
+ this.brokerController.getConsumerOffsetManager().commitOffset(
+ RemotingHelper.parseChannelRemoteAddr(ctx.channel()), group,
topic, queueId, offset);
+ response.setCode(ResponseCode.SUCCESS);
+ response.setRemark(null);
return response;
}
diff --git
a/broker/src/main/java/org/apache/rocketmq/broker/processor/PullMessageProcessor.java
b/broker/src/main/java/org/apache/rocketmq/broker/processor/PullMessageProcessor.java
index 989bc3124..700ce55d7 100644
---
a/broker/src/main/java/org/apache/rocketmq/broker/processor/PullMessageProcessor.java
+++
b/broker/src/main/java/org/apache/rocketmq/broker/processor/PullMessageProcessor.java
@@ -66,6 +66,7 @@ import org.apache.rocketmq.remoting.protocol.RemotingCommand;
import org.apache.rocketmq.store.GetMessageResult;
import org.apache.rocketmq.store.GetMessageStatus;
import org.apache.rocketmq.store.MessageFilter;
+import org.apache.rocketmq.store.MessageStore;
import org.apache.rocketmq.store.config.BrokerRole;
import org.apache.rocketmq.store.stats.BrokerStatsManager;
@@ -303,7 +304,8 @@ public class PullMessageProcessor implements
NettyRequestProcessor {
if
(!PermName.isReadable(this.brokerController.getBrokerConfig().getBrokerPermission()))
{
response.setCode(ResponseCode.NO_PERMISSION);
- response.setRemark(String.format("the broker[%s] pulling message
is forbidden", this.brokerController.getBrokerConfig().getBrokerIP1()));
+ response.setRemark(String.format("the broker[%s] pulling message
is forbidden",
+ this.brokerController.getBrokerConfig().getBrokerIP1()));
return response;
}
@@ -462,9 +464,26 @@ public class PullMessageProcessor implements
NettyRequestProcessor {
this.brokerController.getConsumerFilterManager());
}
- final GetMessageResult getMessageResult =
-
this.brokerController.getMessageStore().getMessage(requestHeader.getConsumerGroup(),
requestHeader.getTopic(),
- requestHeader.getQueueId(), requestHeader.getQueueOffset(),
requestHeader.getMaxMsgNums(), messageFilter);
+ final MessageStore messageStore = brokerController.getMessageStore();
+ final boolean useResetOffsetFeature =
brokerController.getBrokerConfig().isUseServerSideResetOffset();
+ String topic = requestHeader.getTopic();
+ String group = requestHeader.getConsumerGroup();
+ int queueId = requestHeader.getQueueId();
+ Long resetOffset =
brokerController.getConsumerOffsetManager().queryThenEraseResetOffset(topic,
group, queueId);
+
+ GetMessageResult getMessageResult;
+ if (useResetOffsetFeature && null != resetOffset) {
+ getMessageResult = new GetMessageResult();
+ getMessageResult.setStatus(GetMessageStatus.OFFSET_RESET);
+ getMessageResult.setNextBeginOffset(resetOffset);
+
getMessageResult.setMinOffset(messageStore.getMinOffsetInQueue(topic, queueId));
+
getMessageResult.setMaxOffset(messageStore.getMaxOffsetInQueue(topic, queueId));
+ getMessageResult.setSuggestPullingFromSlave(false);
+ } else {
+ getMessageResult = messageStore.getMessage(
+ group, topic, queueId, requestHeader.getQueueOffset(),
requestHeader.getMaxMsgNums(), messageFilter);
+ }
+
if (getMessageResult != null) {
response.setRemark(getMessageResult.getStatus().name());
responseHeader.setNextBeginOffset(getMessageResult.getNextBeginOffset());
@@ -512,6 +531,11 @@ public class PullMessageProcessor implements
NettyRequestProcessor {
case OFFSET_OVERFLOW_ONE:
response.setCode(ResponseCode.PULL_NOT_FOUND);
break;
+ case OFFSET_RESET:
+ response.setCode(ResponseCode.PULL_OFFSET_MOVED);
+ LOGGER.info("The queue under pulling was previously reset
to start from {}",
+ getMessageResult.getNextBeginOffset());
+ break;
case OFFSET_TOO_SMALL:
response.setCode(ResponseCode.PULL_OFFSET_MOVED);
LOGGER.info("the request offset too small. group={},
topic={}, requestOffset={}, brokerMinOffset={}, clientIp={}",
diff --git
a/broker/src/main/java/org/apache/rocketmq/broker/subscription/SubscriptionGroupManager.java
b/broker/src/main/java/org/apache/rocketmq/broker/subscription/SubscriptionGroupManager.java
index b43579fae..e9aaba388 100644
---
a/broker/src/main/java/org/apache/rocketmq/broker/subscription/SubscriptionGroupManager.java
+++
b/broker/src/main/java/org/apache/rocketmq/broker/subscription/SubscriptionGroupManager.java
@@ -20,6 +20,7 @@ import java.util.Iterator;
import java.util.Map.Entry;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
+import org.apache.commons.lang3.StringUtils;
import org.apache.rocketmq.broker.BrokerController;
import org.apache.rocketmq.broker.BrokerPathConfigHelper;
import org.apache.rocketmq.common.ConfigManager;
@@ -251,6 +252,7 @@ public class SubscriptionGroupManager extends ConfigManager
{
}
}
+ @Override
public String encode(final boolean prettyFormat) {
return RemotingSerializable.toJson(this, prettyFormat);
}
@@ -294,4 +296,12 @@ public class SubscriptionGroupManager extends
ConfigManager {
this.subscriptionGroupTable.put(key,
otherSubscriptionGroupTable.get(key));
}
}
+
+ public boolean containsSubscriptionGroup(String group) {
+ if (StringUtils.isBlank(group)) {
+ return false;
+ }
+
+ return subscriptionGroupTable.containsKey(group);
+ }
}
diff --git
a/broker/src/test/java/org/apache/rocketmq/broker/offset/ConsumerOffsetManagerTest.java
b/broker/src/test/java/org/apache/rocketmq/broker/offset/ConsumerOffsetManagerTest.java
index 4374164da..7bd289a6f 100644
---
a/broker/src/test/java/org/apache/rocketmq/broker/offset/ConsumerOffsetManagerTest.java
+++
b/broker/src/test/java/org/apache/rocketmq/broker/offset/ConsumerOffsetManagerTest.java
@@ -17,22 +17,34 @@
package org.apache.rocketmq.broker.offset;
+import org.apache.rocketmq.broker.BrokerController;
+import org.apache.rocketmq.store.config.MessageStoreConfig;
+import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
+import org.mockito.Mockito;
import static org.assertj.core.api.Assertions.assertThat;
public class ConsumerOffsetManagerTest {
+ private static final String KEY = "FooBar@FooBarGroup";
+
+ private BrokerController brokerController;
+
private ConsumerOffsetManager consumerOffsetManager;
- private static final String KEY = "FooBar@FooBarGroup";
@Before
public void init() {
- consumerOffsetManager = new ConsumerOffsetManager();
+ brokerController = Mockito.mock(BrokerController.class);
+ consumerOffsetManager = new ConsumerOffsetManager(brokerController);
+
+ MessageStoreConfig messageStoreConfig = new MessageStoreConfig();
+
Mockito.when(brokerController.getMessageStoreConfig()).thenReturn(messageStoreConfig);
+
ConcurrentHashMap<String, ConcurrentMap<Integer, Long>> offsetTable =
new ConcurrentHashMap<>(512);
offsetTable.put(KEY,new ConcurrentHashMap<Integer, Long>() {{
put(1,2L);
@@ -52,4 +64,21 @@ public class ConsumerOffsetManagerTest {
consumerOffsetManager.cleanOffsetByTopic("FooBar");
assertThat(!consumerOffsetManager.getOffsetTable().containsKey(KEY)).isTrue();
}
+
+ @Test
+ public void testOffsetPersistInMemory() {
+ ConcurrentMap<String, ConcurrentMap<Integer, Long>> offsetTable =
consumerOffsetManager.getOffsetTable();
+ ConcurrentMap<Integer, Long> table = new ConcurrentHashMap<>();
+ table.put(0, 1L);
+ table.put(1, 3L);
+ String group = "G1";
+ offsetTable.put(group, table);
+
+ consumerOffsetManager.persist();
+ ConsumerOffsetManager manager = new
ConsumerOffsetManager(brokerController);
+ manager.load();
+
+ ConcurrentMap<Integer, Long> offsetTableLoaded =
manager.getOffsetTable().get(group);
+ Assert.assertEquals(table, offsetTableLoaded);
+ }
}
diff --git
a/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java
b/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java
index 145a44d63..b327ee28b 100644
--- a/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java
+++ b/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java
@@ -2052,6 +2052,40 @@ public class MQClientAPIImpl implements
NameServerUpdateCallback {
return invokeBrokerToResetOffset(addr, topic, group, timestamp,
isForce, timeoutMillis, false);
}
+ public Map<MessageQueue, Long> invokeBrokerToResetOffset(final String
addr, final String topic, final String group,
+ final long timestamp, int queueId, Long offset, final long
timeoutMillis)
+ throws RemotingException, MQClientException, InterruptedException {
+
+ ResetOffsetRequestHeader requestHeader = new
ResetOffsetRequestHeader();
+ requestHeader.setTopic(topic);
+ requestHeader.setGroup(group);
+ requestHeader.setQueueId(queueId);
+ requestHeader.setTimestamp(timestamp);
+ requestHeader.setOffset(offset);
+ RemotingCommand request =
RemotingCommand.createRequestCommand(RequestCode.INVOKE_BROKER_TO_RESET_OFFSET,
+ requestHeader);
+
+ RemotingCommand response = remotingClient.invokeSync(
+ MixAll.brokerVIPChannel(this.clientConfig.isVipChannelEnabled(),
addr), request, timeoutMillis);
+ switch (response.getCode()) {
+ case ResponseCode.SUCCESS: {
+ if (null != response.getBody()) {
+ return ResetOffsetBody.decode(response.getBody(),
ResetOffsetBody.class).getOffsetTable();
+ }
+ break;
+ }
+ case ResponseCode.TOPIC_NOT_EXIST:
+ case ResponseCode.SUBSCRIPTION_NOT_EXIST:
+ case ResponseCode.SYSTEM_ERROR:
+ log.warn("Invoke broker to reset offset error code={},
remark={}",
+ response.getCode(), response.getRemark());
+ break;
+ default:
+ break;
+ }
+ throw new MQClientException(response.getCode(), response.getRemark());
+ }
+
public Map<MessageQueue, Long> invokeBrokerToResetOffset(final String
addr, final String topic, final String group,
final long timestamp, final boolean isForce, final long timeoutMillis,
boolean isC)
throws RemotingException, MQClientException, InterruptedException {
@@ -2060,6 +2094,8 @@ public class MQClientAPIImpl implements
NameServerUpdateCallback {
requestHeader.setGroup(group);
requestHeader.setTimestamp(timestamp);
requestHeader.setForce(isForce);
+ // offset is -1 means offset is null
+ requestHeader.setOffset(-1L);
RemotingCommand request =
RemotingCommand.createRequestCommand(RequestCode.INVOKE_BROKER_TO_RESET_OFFSET,
requestHeader);
if (isC) {
diff --git a/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java
b/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java
index fc49428bb..854ef6334 100644
--- a/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java
+++ b/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java
@@ -291,6 +291,8 @@ public class BrokerConfig extends BrokerIdentity {
private boolean asyncSendEnable = true;
+ private boolean useServerSideResetOffset = false;
+
private long consumerOffsetUpdateVersionStep = 500;
private long delayOffsetUpdateVersionStep = 200;
@@ -1356,4 +1358,12 @@ public class BrokerConfig extends BrokerIdentity {
public void setFetchNameSrvAddrByDnsLookup(boolean
fetchNameSrvAddrByDnsLookup) {
this.fetchNameSrvAddrByDnsLookup = fetchNameSrvAddrByDnsLookup;
}
+
+ public boolean isUseServerSideResetOffset() {
+ return useServerSideResetOffset;
+ }
+
+ public void setUseServerSideResetOffset(boolean useServerSideResetOffset) {
+ this.useServerSideResetOffset = useServerSideResetOffset;
+ }
}
diff --git
a/common/src/main/java/org/apache/rocketmq/common/protocol/body/ResetOffsetBody.java
b/common/src/main/java/org/apache/rocketmq/common/protocol/body/ResetOffsetBody.java
index b28e74b56..d2a97f893 100644
---
a/common/src/main/java/org/apache/rocketmq/common/protocol/body/ResetOffsetBody.java
+++
b/common/src/main/java/org/apache/rocketmq/common/protocol/body/ResetOffsetBody.java
@@ -17,13 +17,19 @@
package org.apache.rocketmq.common.protocol.body;
+import java.util.HashMap;
import java.util.Map;
import org.apache.rocketmq.common.message.MessageQueue;
import org.apache.rocketmq.remoting.protocol.RemotingSerializable;
public class ResetOffsetBody extends RemotingSerializable {
+
private Map<MessageQueue, Long> offsetTable;
+ public ResetOffsetBody() {
+ offsetTable = new HashMap<>();
+ }
+
public Map<MessageQueue, Long> getOffsetTable() {
return offsetTable;
}
diff --git
a/common/src/main/java/org/apache/rocketmq/common/protocol/header/ResetOffsetRequestHeader.java
b/common/src/main/java/org/apache/rocketmq/common/protocol/header/ResetOffsetRequestHeader.java
index c3bfa2189..78be60a76 100644
---
a/common/src/main/java/org/apache/rocketmq/common/protocol/header/ResetOffsetRequestHeader.java
+++
b/common/src/main/java/org/apache/rocketmq/common/protocol/header/ResetOffsetRequestHeader.java
@@ -22,12 +22,20 @@ import org.apache.rocketmq.remoting.annotation.CFNotNull;
import org.apache.rocketmq.remoting.exception.RemotingCommandException;
public class ResetOffsetRequestHeader implements CommandCustomHeader {
+
@CFNotNull
private String topic;
+
@CFNotNull
private String group;
+
+ private int queueId = -1;
+
+ private Long offset;
+
@CFNotNull
private long timestamp;
+
@CFNotNull
private boolean isForce;
@@ -63,6 +71,22 @@ public class ResetOffsetRequestHeader implements
CommandCustomHeader {
this.isForce = isForce;
}
+ public int getQueueId() {
+ return queueId;
+ }
+
+ public void setQueueId(int queueId) {
+ this.queueId = queueId;
+ }
+
+ public Long getOffset() {
+ return offset;
+ }
+
+ public void setOffset(Long offset) {
+ this.offset = offset;
+ }
+
@Override
public void checkFields() throws RemotingCommandException {
diff --git
a/store/src/main/java/org/apache/rocketmq/store/GetMessageStatus.java
b/store/src/main/java/org/apache/rocketmq/store/GetMessageStatus.java
index 6a824b898..bc244865f 100644
--- a/store/src/main/java/org/apache/rocketmq/store/GetMessageStatus.java
+++ b/store/src/main/java/org/apache/rocketmq/store/GetMessageStatus.java
@@ -35,4 +35,6 @@ public enum GetMessageStatus {
NO_MATCHED_LOGIC_QUEUE,
NO_MESSAGE_IN_QUEUE,
+
+ OFFSET_RESET
}
diff --git
a/test/src/main/java/org/apache/rocketmq/test/listener/AbstractListener.java
b/test/src/main/java/org/apache/rocketmq/test/listener/AbstractListener.java
index c2a3891dd..10eedd1b9 100644
--- a/test/src/main/java/org/apache/rocketmq/test/listener/AbstractListener.java
+++ b/test/src/main/java/org/apache/rocketmq/test/listener/AbstractListener.java
@@ -19,7 +19,6 @@ package org.apache.rocketmq.test.listener;
import java.util.ArrayList;
import java.util.Collection;
-import java.util.Iterator;
import java.util.List;
import java.util.Map;
import org.apache.log4j.Logger;
@@ -62,54 +61,40 @@ public class AbstractListener extends MQCollector
implements MessageListener {
super.lockCollectors();
}
- public Collection<Object> waitForMessageConsume(Collection<Object>
allSendMsgs,
- int timeoutMills) {
- this.allSendMsgs = allSendMsgs;
- List<Object> sendMsgs = new ArrayList<Object>();
- sendMsgs.addAll(allSendMsgs);
+ public Collection<Object> waitForMessageConsume(Collection<Object>
allSendMessages, int timeoutMills) {
+ this.allSendMsgs = allSendMessages;
+ List<Object> sendMessages = new ArrayList<>(allSendMessages);
long curTime = System.currentTimeMillis();
- while (!sendMsgs.isEmpty()) {
- Iterator<Object> iter = sendMsgs.iterator();
- while (iter.hasNext()) {
- Object msg = iter.next();
- if (msgBodys.getAllData().contains(msg)) {
- iter.remove();
- }
- }
- if (sendMsgs.isEmpty()) {
+ while (!sendMessages.isEmpty()) {
+ sendMessages.removeIf(msg -> msgBodys.getAllData().contains(msg));
+ if (sendMessages.isEmpty()) {
break;
} else {
if (System.currentTimeMillis() - curTime >= timeoutMills) {
- LOGGER.error(String.format("timeout but [%s] not recv
all send messages!",
- listenerName));
+ LOGGER.error(String.format("timeout but [%s] not recv all
send messages!", listenerName));
break;
} else {
- LOGGER.info(String.format("[%s] still [%s] msg not recv!",
listenerName,
- sendMsgs.size()));
+ LOGGER.info(String.format("[%s] still [%s] msg not recv!",
listenerName, sendMessages.size()));
TestUtil.waitForMonment(500);
}
}
}
-
- return sendMsgs;
+ return sendMessages;
}
- public long waitForMessageConsume(int size,
- int timeoutMills) {
-
+ public long waitForMessageConsume(int size, int timeoutMills) {
long curTime = System.currentTimeMillis();
while (true) {
if (msgBodys.getDataSize() >= size) {
break;
}
if (System.currentTimeMillis() - curTime >= timeoutMills) {
- LOGGER.error(String.format("timeout but [%s] not recv all
send messages!",
- listenerName));
+ LOGGER.error(String.format("timeout but [%s] not recv all
send messages!", listenerName));
break;
} else {
- LOGGER.info(String.format("[%s] still [%s] msg not recv!",
listenerName,
- size - msgBodys.getDataSize()));
+ LOGGER.info(String.format("[%s] still [%s] msg not recv!",
+ listenerName, size - msgBodys.getDataSize()));
TestUtil.waitForMonment(500);
}
}
diff --git
a/test/src/main/java/org/apache/rocketmq/test/listener/rmq/concurrent/RMQNormalListener.java
b/test/src/main/java/org/apache/rocketmq/test/listener/rmq/concurrent/RMQNormalListener.java
index 908aed1be..678216810 100644
---
a/test/src/main/java/org/apache/rocketmq/test/listener/rmq/concurrent/RMQNormalListener.java
+++
b/test/src/main/java/org/apache/rocketmq/test/listener/rmq/concurrent/RMQNormalListener.java
@@ -27,8 +27,10 @@ import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.test.listener.AbstractListener;
public class RMQNormalListener extends AbstractListener implements
MessageListenerConcurrently {
+
private ConsumeConcurrentlyStatus consumeStatus =
ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
- private AtomicInteger msgIndex = new AtomicInteger(0);
+
+ private final AtomicInteger msgIndex = new AtomicInteger(0);
public RMQNormalListener() {
super();
@@ -47,6 +49,11 @@ public class RMQNormalListener extends AbstractListener
implements MessageListen
super(originMsgCollector, msgBodyCollector);
}
+ public AtomicInteger getMsgIndex() {
+ return msgIndex;
+ }
+
+ @Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
ConsumeConcurrentlyContext consumeConcurrentlyContext) {
for (MessageExt msg : msgs) {
diff --git a/test/src/test/java/org/apache/rocketmq/test/base/BaseConf.java
b/test/src/test/java/org/apache/rocketmq/test/base/BaseConf.java
index 818bdbd65..035a8be68 100644
--- a/test/src/test/java/org/apache/rocketmq/test/base/BaseConf.java
+++ b/test/src/test/java/org/apache/rocketmq/test/base/BaseConf.java
@@ -57,25 +57,31 @@ import org.slf4j.LoggerFactory;
import static org.awaitility.Awaitility.await;
public class BaseConf {
+
+ private final static Logger log = LoggerFactory.getLogger(BaseConf.class);
+
public final static String NAMESRV_ADDR;
+
+ //the logic queue test need at least three brokers
+ protected final static String CLUSTER_NAME;
protected final static String BROKER1_NAME;
protected final static String BROKER2_NAME;
- //the logic queue test need at least three brokers
protected final static String BROKER3_NAME;
- protected final static String CLUSTER_NAME;
+
protected final static int BROKER_NUM = 3;
protected final static int WAIT_TIME = 5;
protected final static int CONSUME_TIME = 2 * 60 * 1000;
protected final static int QUEUE_NUMBERS = 8;
+
protected static NamesrvController namesrvController;
protected static BrokerController brokerController1;
protected static BrokerController brokerController2;
protected static BrokerController brokerController3;
protected static List<BrokerController> brokerControllerList;
protected static Map<String, BrokerController> brokerControllerMap;
+
protected static List<Object> mqClients = new ArrayList<Object>();
protected static boolean debug = false;
- private final static Logger log = LoggerFactory.getLogger(BaseConf.class);
static {
System.setProperty(RemotingCommand.REMOTING_VERSION_KEY,
Integer.toString(MQVersion.CURRENT_VERSION));
@@ -100,7 +106,8 @@ public class BaseConf {
BROKER2_NAME = brokerController2.getBrokerConfig().getBrokerName();
BROKER3_NAME = brokerController3.getBrokerConfig().getBrokerName();
brokerControllerList = ImmutableList.of(brokerController1,
brokerController2, brokerController3);
- brokerControllerMap =
brokerControllerList.stream().collect(Collectors.toMap(input ->
input.getBrokerConfig().getBrokerName(), Function.identity()));
+ brokerControllerMap = brokerControllerList.stream().collect(
+ Collectors.toMap(input -> input.getBrokerConfig().getBrokerName(),
Function.identity()));
}
public BaseConf() {
@@ -203,7 +210,7 @@ public class BaseConf {
}
public static DefaultMQAdminExt getAdmin(String nsAddr) {
- final DefaultMQAdminExt mqAdminExt = new DefaultMQAdminExt(500);
+ final DefaultMQAdminExt mqAdminExt = new DefaultMQAdminExt(3 * 1000);
mqAdminExt.setNamesrvAddr(nsAddr);
mqAdminExt.setPollNameServerInterval(100);
mqClients.add(mqAdminExt);
diff --git
a/test/src/test/java/org/apache/rocketmq/test/offset/OffsetResetIT.java
b/test/src/test/java/org/apache/rocketmq/test/offset/OffsetResetIT.java
new file mode 100644
index 000000000..edf7b4d0d
--- /dev/null
+++ b/test/src/test/java/org/apache/rocketmq/test/offset/OffsetResetIT.java
@@ -0,0 +1,174 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.test.offset;
+
+import java.time.Duration;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.log4j.Logger;
+import org.apache.rocketmq.broker.BrokerController;
+import org.apache.rocketmq.client.exception.MQClientException;
+import org.apache.rocketmq.common.admin.ConsumeStats;
+import org.apache.rocketmq.common.admin.OffsetWrapper;
+import org.apache.rocketmq.common.message.MessageQueue;
+import org.apache.rocketmq.common.protocol.RequestCode;
+import org.apache.rocketmq.common.protocol.header.ResetOffsetRequestHeader;
+import org.apache.rocketmq.remoting.protocol.RemotingCommand;
+import org.apache.rocketmq.test.base.BaseConf;
+import org.apache.rocketmq.test.client.rmq.RMQNormalConsumer;
+import org.apache.rocketmq.test.client.rmq.RMQNormalProducer;
+import org.apache.rocketmq.test.listener.rmq.concurrent.RMQNormalListener;
+import org.apache.rocketmq.test.message.MessageQueueMsg;
+import org.apache.rocketmq.tools.admin.DefaultMQAdminExt;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.FixMethodOrder;
+import org.junit.Test;
+import org.junit.runners.MethodSorters;
+
+import static org.awaitility.Awaitility.await;
+
+@FixMethodOrder(MethodSorters.NAME_ASCENDING)
+public class OffsetResetIT extends BaseConf {
+
+ private static final Logger LOGGER = Logger.getLogger(OffsetResetIT.class);
+
+ private RMQNormalListener listener = null;
+ private RMQNormalProducer producer = null;
+ private RMQNormalConsumer consumer = null;
+ private DefaultMQAdminExt defaultMQAdminExt = null;
+ private String topic = null;
+
+ @Before
+ public void init() throws MQClientException {
+ topic = initTopic();
+ LOGGER.info(String.format("use topic: %s;", topic));
+
+ for (BrokerController controller : brokerControllerList) {
+ controller.getBrokerConfig().setLongPollingEnable(false);
+ controller.getBrokerConfig().setShortPollingTimeMills(500);
+ controller.getBrokerConfig().setUseServerSideResetOffset(true);
+ }
+
+ listener = new RMQNormalListener();
+ producer = getProducer(NAMESRV_ADDR, topic);
+ consumer = getConsumer(NAMESRV_ADDR, topic, "*", listener);
+
+ defaultMQAdminExt = BaseConf.getAdmin(NAMESRV_ADDR);
+ defaultMQAdminExt.start();
+ }
+
+ @After
+ public void tearDown() {
+ shutdown();
+ }
+
+ @Test
+ public void testEncodeOffsetHeader() {
+ ResetOffsetRequestHeader requestHeader = new
ResetOffsetRequestHeader();
+ requestHeader.setTopic(topic);
+ requestHeader.setGroup(consumer.getConsumerGroup());
+ requestHeader.setTimestamp(System.currentTimeMillis());
+ requestHeader.setForce(false);
+
RemotingCommand.createRequestCommand(RequestCode.INVOKE_BROKER_TO_RESET_OFFSET,
requestHeader);
+ }
+
+ /**
+ * use mq admin tool to query remote offset
+ */
+ private long getConsumerLag(String topic, String group) throws Exception {
+ long consumerLag = 0L;
+ for (BrokerController controller : brokerControllerList) {
+ ConsumeStats consumeStats =
defaultMQAdminExt.getDefaultMQAdminExtImpl()
+ .getMqClientInstance().getMQClientAPIImpl()
+ .getConsumeStats(controller.getBrokerAddr(), group, topic,
3000);
+ Map<MessageQueue, OffsetWrapper> offsetTable =
consumeStats.getOffsetTable();
+
+ for (Map.Entry<MessageQueue, OffsetWrapper> entry :
offsetTable.entrySet()) {
+ MessageQueue messageQueue = entry.getKey();
+ OffsetWrapper offsetWrapper = entry.getValue();
+
+ Assert.assertEquals(messageQueue.getBrokerName(),
controller.getBrokerConfig().getBrokerName());
+ long brokerOffset =
controller.getMessageStore().getMaxOffsetInQueue(topic,
messageQueue.getQueueId());
+ long consumerOffset =
controller.getConsumerOffsetManager().queryOffset(
+ consumer.getConsumerGroup(), topic,
messageQueue.getQueueId());
+ Assert.assertEquals(brokerOffset,
offsetWrapper.getBrokerOffset());
+ Assert.assertEquals(consumerOffset,
offsetWrapper.getConsumerOffset());
+
+ consumerLag += brokerOffset - consumerOffset;
+ }
+ }
+ return consumerLag;
+ }
+
+ @Test
+ public void testResetOffsetSingleQueue() throws Exception {
+ int msgSize = 100;
+ List<MessageQueue> mqs = producer.getMessageQueue();
+ MessageQueueMsg messageQueueMsg = new MessageQueueMsg(mqs, msgSize);
+
+ producer.send(messageQueueMsg.getMsgsWithMQ());
+ consumer.getListener().waitForMessageConsume(producer.getAllMsgBody(),
CONSUME_TIME);
+
+
await().pollInterval(Duration.ofSeconds(1)).atMost(Duration.ofMinutes(3)).until(
+ () -> 0L == this.getConsumerLag(topic,
consumer.getConsumerGroup()));
+
+ for (BrokerController controller : brokerControllerList) {
+ defaultMQAdminExt.resetOffsetByQueueId(controller.getBrokerAddr(),
+ consumer.getConsumerGroup(), consumer.getTopic(), 3, 0);
+ }
+
+ int hasConsumeBefore = listener.getMsgIndex().get();
+ int expectAfterReset = brokerControllerList.size() * msgSize;
+
await().pollInterval(Duration.ofSeconds(1)).atMost(Duration.ofMinutes(3)).until(()
-> {
+ long receive = listener.getMsgIndex().get();
+ long expect = hasConsumeBefore + expectAfterReset;
+ return receive >= expect;
+ });
+ }
+
+ @Test
+ public void testResetOffsetTotal() throws Exception {
+ int msgSize = 100;
+ long start = System.currentTimeMillis();
+ List<MessageQueue> mqs = producer.getMessageQueue();
+ MessageQueueMsg messageQueueMsg = new MessageQueueMsg(mqs, msgSize);
+
+ producer.send(messageQueueMsg.getMsgsWithMQ());
+ consumer.getListener().waitForMessageConsume(producer.getAllMsgBody(),
CONSUME_TIME);
+
+
await().pollInterval(Duration.ofSeconds(1)).atMost(Duration.ofMinutes(3)).until(
+ () -> 0L == this.getConsumerLag(topic,
consumer.getConsumerGroup()));
+
+ for (BrokerController controller : brokerControllerList) {
+
defaultMQAdminExt.getDefaultMQAdminExtImpl().getMqClientInstance().getMQClientAPIImpl()
+ .invokeBrokerToResetOffset(controller.getBrokerAddr(),
+ consumer.getTopic(), consumer.getConsumerGroup(), start,
true, 3 * 1000);
+ }
+
+ int hasConsumeBefore = listener.getMsgIndex().get();
+ int expectAfterReset = mqs.size() * msgSize;
+
await().pollInterval(Duration.ofSeconds(1)).atMost(Duration.ofMinutes(3)).until(()
-> {
+ long receive = listener.getMsgIndex().get();
+ long expect = hasConsumeBefore + expectAfterReset;
+ return receive >= expect;
+ });
+ }
+}
\ No newline at end of file
diff --git
a/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExtImpl.java
b/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExtImpl.java
index 06503962e..9f0fd4043 100644
---
a/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExtImpl.java
+++
b/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExtImpl.java
@@ -16,6 +16,7 @@
*/
package org.apache.rocketmq.tools.admin;
+import com.alibaba.fastjson.JSON;
import java.io.UnsupportedEncodingException;
import java.text.MessageFormat;
import java.util.ArrayList;
@@ -1780,6 +1781,18 @@ public class DefaultMQAdminExtImpl implements
MQAdminExt, MQAdminExtInner {
requestHeader.setQueueId(queueId);
requestHeader.setCommitOffset(resetOffset);
this.mqClientInstance.getMQClientAPIImpl().updateConsumerOffset(brokerAddr,
requestHeader, timeoutMillis);
+ try {
+ Map<MessageQueue, Long> result =
mqClientInstance.getMQClientAPIImpl()
+ .invokeBrokerToResetOffset(brokerAddr, topicName,
consumeGroup, 0, queueId, resetOffset, timeoutMillis);
+ if (null != result) {
+ for (Map.Entry<MessageQueue, Long> entry : result.entrySet()) {
+ log.info("Reset single message queue {} offset from {} to
{}",
+ JSON.toJSONString(entry.getKey()), entry.getValue(),
resetOffset);
+ }
+ }
+ } catch (MQClientException e) {
+ throw new MQBrokerException(e.getResponseCode(), e.getMessage());
+ }
}
@Override
diff --git
a/tools/src/main/java/org/apache/rocketmq/tools/command/offset/ResetOffsetByTimeCommand.java
b/tools/src/main/java/org/apache/rocketmq/tools/command/offset/ResetOffsetByTimeCommand.java
index f95c7e514..9c7b7ad9d 100644
---
a/tools/src/main/java/org/apache/rocketmq/tools/command/offset/ResetOffsetByTimeCommand.java
+++
b/tools/src/main/java/org/apache/rocketmq/tools/command/offset/ResetOffsetByTimeCommand.java
@@ -17,8 +17,8 @@
package org.apache.rocketmq.tools.command.offset;
-import java.util.Iterator;
import java.util.Map;
+import java.util.Objects;
import org.apache.commons.cli.CommandLine;
import org.apache.commons.cli.Option;
import org.apache.commons.cli.Options;
@@ -57,11 +57,11 @@ public class ResetOffsetByTimeCommand implements SubCommand
{
opt.setRequired(true);
options.addOption(opt);
- opt = new Option("f", "force", true, "set the force rollback by
timestamp switch[true|false]");
+ opt = new Option("f", "force", true, "set the force rollback by
timestamp switch[true|false]. Deprecated.");
opt.setRequired(false);
options.addOption(opt);
- opt = new Option("c", "cplus", false, "reset c++ client offset");
+ opt = new Option("c", "cplus", false, "reset c++ client offset.
Deprecated.");
opt.setRequired(false);
options.addOption(opt);
@@ -73,6 +73,10 @@ public class ResetOffsetByTimeCommand implements SubCommand {
opt.setRequired(false);
options.addOption(opt);
+ opt = new Option("o", "offset", true, "Expect queue offset, not
support old version broker");
+ opt.setRequired(false);
+ options.addOption(opt);
+
return options;
}
@@ -84,26 +88,23 @@ public class ResetOffsetByTimeCommand implements SubCommand
{
String group = commandLine.getOptionValue("g").trim();
String topic = commandLine.getOptionValue("t").trim();
String timeStampStr = commandLine.getOptionValue("s").trim();
- long timestamp = timeStampStr.equals("now") ? -1 : 0;
+ long timestamp = "now".equals(timeStampStr) ?
System.currentTimeMillis() : 0;
try {
if (timestamp == 0) {
timestamp = Long.parseLong(timeStampStr);
}
} catch (NumberFormatException e) {
-
- timestamp = UtilAll.parseDate(timeStampStr,
UtilAll.YYYY_MM_DD_HH_MM_SS_SSS).getTime();
+ timestamp = Objects.requireNonNull(
+ UtilAll.parseDate(timeStampStr,
UtilAll.YYYY_MM_DD_HH_MM_SS_SSS)).getTime();
}
boolean force = true;
if (commandLine.hasOption('f')) {
- force =
Boolean.valueOf(commandLine.getOptionValue("f").trim());
+ force =
Boolean.parseBoolean(commandLine.getOptionValue("f").trim());
}
- boolean isC = false;
- if (commandLine.hasOption('c')) {
- isC = true;
- }
+ boolean isC = commandLine.hasOption('c');
String brokerAddr = null;
if (commandLine.hasOption('b')) {
@@ -118,19 +119,24 @@ public class ResetOffsetByTimeCommand implements
SubCommand {
defaultMQAdminExt.setNamesrvAddr(commandLine.getOptionValue('n').trim());
}
+ Long offset = null;
+ if (commandLine.hasOption('o')) {
+ offset = Long.parseLong(commandLine.getOptionValue('o'));
+ }
+
defaultMQAdminExt.start();
- if (brokerAddr != null && queueId > -1) {
- System.out.printf("rollback consumer offset by specified
group[%s], topic[%s], queueId[%s], broker[%s], timestamp(string)[%s],
timestamp(long)[%s]%n",
+ if (brokerAddr != null && queueId >= 0) {
+ System.out.printf("start reset consumer offset by specified, "
+
+ "group[%s], topic[%s], queueId[%s], broker[%s],
timestamp(string)[%s], timestamp(long)[%s]%n",
group, topic, queueId, brokerAddr, timeStampStr,
timestamp);
- try {
- long resetOffset =
defaultMQAdminExt.searchOffset(brokerAddr, topic, queueId, timestamp, 3000);
- System.out.printf("Rollback Offset is: %s", resetOffset);
- if (resetOffset > 0) {
- defaultMQAdminExt.resetOffsetByQueueId(brokerAddr,
group, topic, queueId, resetOffset);
- }
- } catch (Throwable e) {
- throw e;
+
+ long resetOffset = null != offset ? offset :
+ defaultMQAdminExt.searchOffset(brokerAddr, topic, queueId,
timestamp, 3000);
+
+ System.out.printf("reset consumer offset to %d%n",
resetOffset);
+ if (resetOffset > 0) {
+ defaultMQAdminExt.resetOffsetByQueueId(brokerAddr, group,
topic, queueId, resetOffset);
}
return;
}
@@ -139,6 +145,7 @@ public class ResetOffsetByTimeCommand implements SubCommand
{
try {
offsetTable = defaultMQAdminExt.resetOffsetByTimestamp(topic,
group, timestamp, force, isC);
} catch (MQClientException e) {
+ // if consumer not online, use old command to reset reset
if (ResponseCode.CONSUMER_NOT_ONLINE == e.getResponseCode()) {
ResetOffsetByTimeOldCommand.resetOffset(defaultMQAdminExt,
group, topic, timestamp, force, timeStampStr);
return;
@@ -146,17 +153,13 @@ public class ResetOffsetByTimeCommand implements
SubCommand {
throw e;
}
- System.out.printf("rollback consumer offset by specified
group[%s], topic[%s], force[%s], timestamp(string)[%s], timestamp(long)[%s]%n",
+ System.out.printf("start reset consumer offset by specified, " +
+ "group[%s], topic[%s], force[%s], timestamp(string)[%s],
timestamp(long)[%s]%n",
group, topic, force, timeStampStr, timestamp);
- System.out.printf("%-40s %-40s %-40s%n",
- "#brokerName",
- "#queueId",
- "#offset");
+ System.out.printf("%-40s %-40s %-40s%n", "#brokerName",
"#queueId", "#offset");
- Iterator<Map.Entry<MessageQueue, Long>> iterator =
offsetTable.entrySet().iterator();
- while (iterator.hasNext()) {
- Map.Entry<MessageQueue, Long> entry = iterator.next();
+ for (Map.Entry<MessageQueue, Long> entry : offsetTable.entrySet())
{
System.out.printf("%-40s %-40d %-40d%n",
UtilAll.frontStringAtLeast(entry.getKey().getBrokerName(),
32),
entry.getKey().getQueueId(),
diff --git
a/tools/src/main/java/org/apache/rocketmq/tools/command/offset/ResetOffsetByTimeOldCommand.java
b/tools/src/main/java/org/apache/rocketmq/tools/command/offset/ResetOffsetByTimeOldCommand.java
index 0c02d8f79..d86c3cf71 100644
---
a/tools/src/main/java/org/apache/rocketmq/tools/command/offset/ResetOffsetByTimeOldCommand.java
+++
b/tools/src/main/java/org/apache/rocketmq/tools/command/offset/ResetOffsetByTimeOldCommand.java
@@ -33,12 +33,16 @@ import org.apache.rocketmq.tools.command.SubCommand;
import org.apache.rocketmq.tools.command.SubCommandException;
public class ResetOffsetByTimeOldCommand implements SubCommand {
+
public static void resetOffset(DefaultMQAdminExt defaultMQAdminExt, String
consumerGroup, String topic,
- long timestamp, boolean force,
- String timeStampStr) throws RemotingException, MQBrokerException,
InterruptedException, MQClientException {
- List<RollbackStats> rollbackStatsList =
defaultMQAdminExt.resetOffsetByTimestampOld(consumerGroup, topic, timestamp,
force);
- System.out.printf(
- "rollback consumer offset by specified consumerGroup[%s],
topic[%s], force[%s], timestamp(string)[%s], timestamp(long)[%s]%n",
+ long timestamp, boolean force, String timeStampStr)
+ throws RemotingException, MQBrokerException, InterruptedException,
MQClientException {
+
+ List<RollbackStats> rollbackStatsList =
+ defaultMQAdminExt.resetOffsetByTimestampOld(consumerGroup, topic,
timestamp, force);
+
+ System.out.printf("reset consumer offset by specified " +
+ "consumerGroup[%s], topic[%s], force[%s],
timestamp(string)[%s], timestamp(long)[%s]%n",
consumerGroup, topic, force, timeStampStr, timestamp);
System.out.printf("%-20s %-20s %-20s %-20s %-20s %-20s%n",
@@ -47,7 +51,7 @@ public class ResetOffsetByTimeOldCommand implements
SubCommand {
"#brokerOffset",
"#consumerOffset",
"#timestampOffset",
- "#rollbackOffset"
+ "#resetOffset"
);
for (RollbackStats rollbackStats : rollbackStatsList) {
@@ -115,7 +119,7 @@ public class ResetOffsetByTimeOldCommand implements
SubCommand {
boolean force = true;
if (commandLine.hasOption('f')) {
- force =
Boolean.valueOf(commandLine.getOptionValue("f").trim());
+ force =
Boolean.parseBoolean(commandLine.getOptionValue("f").trim());
}
defaultMQAdminExt.start();