This is an automated email from the ASF dual-hosted git repository.
zhouxzhan pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git
The following commit(s) were added to refs/heads/develop by this push:
new 6238caaac [ISSUE #6841] new feature: pop batch ack implementation
broker-side (#6842)
6238caaac is described below
commit 6238caaac92fb1870f5eb234ddce86f3be045c79
Author: Quan <[email protected]>
AuthorDate: Mon Jun 19 11:06:50 2023 +0800
[ISSUE #6841] new feature: pop batch ack implementation broker-side (#6842)
---
.../apache/rocketmq/broker/BrokerController.java | 3 +
.../broker/processor/AckMessageProcessor.java | 288 +++++++++++++--------
.../broker/processor/PopBufferMergeService.java | 21 +-
.../processor/PopInflightMessageCounter.java | 22 +-
.../broker/processor/AckMessageProcessorTest.java | 252 +++++++++++++++++-
.../processor/PopInflightMessageCounterTest.java | 13 +-
.../protocol/BitSetSerializerDeserializer.java | 52 ++++
.../rocketmq/remoting/protocol/RequestCode.java | 1 +
.../rocketmq/remoting/protocol/body/BatchAck.java | 131 ++++++++++
.../protocol/body/BatchAckMessageRequestBody.java | 43 +++
.../remoting/protocol/header/ExtraInfoUtil.java | 13 +-
.../remoting/protocol/body/BatchAckTest.java | 112 ++++++++
12 files changed, 809 insertions(+), 142 deletions(-)
diff --git
a/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java
b/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java
index 7be1f20d9..03e9b3241 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java
@@ -1037,6 +1037,9 @@ public class BrokerController {
*/
this.remotingServer.registerProcessor(RequestCode.ACK_MESSAGE,
this.ackMessageProcessor, this.ackMessageExecutor);
this.fastRemotingServer.registerProcessor(RequestCode.ACK_MESSAGE,
this.ackMessageProcessor, this.ackMessageExecutor);
+
+ this.remotingServer.registerProcessor(RequestCode.BATCH_ACK_MESSAGE,
this.ackMessageProcessor, this.ackMessageExecutor);
+
this.fastRemotingServer.registerProcessor(RequestCode.BATCH_ACK_MESSAGE,
this.ackMessageProcessor, this.ackMessageExecutor);
/**
* ChangeInvisibleTimeProcessor
*/
diff --git
a/broker/src/main/java/org/apache/rocketmq/broker/processor/AckMessageProcessor.java
b/broker/src/main/java/org/apache/rocketmq/broker/processor/AckMessageProcessor.java
index fa1c0793e..2140aa881 100644
---
a/broker/src/main/java/org/apache/rocketmq/broker/processor/AckMessageProcessor.java
+++
b/broker/src/main/java/org/apache/rocketmq/broker/processor/AckMessageProcessor.java
@@ -36,18 +36,22 @@ import org.apache.rocketmq.remoting.common.RemotingHelper;
import org.apache.rocketmq.remoting.exception.RemotingCommandException;
import org.apache.rocketmq.remoting.netty.NettyRequestProcessor;
import org.apache.rocketmq.remoting.protocol.RemotingCommand;
+import org.apache.rocketmq.remoting.protocol.RequestCode;
import org.apache.rocketmq.remoting.protocol.ResponseCode;
+import org.apache.rocketmq.remoting.protocol.body.BatchAck;
+import org.apache.rocketmq.remoting.protocol.body.BatchAckMessageRequestBody;
import org.apache.rocketmq.remoting.protocol.header.AckMessageRequestHeader;
import org.apache.rocketmq.remoting.protocol.header.ExtraInfoUtil;
import org.apache.rocketmq.store.PutMessageResult;
import org.apache.rocketmq.store.PutMessageStatus;
import org.apache.rocketmq.store.pop.AckMsg;
+import org.apache.rocketmq.store.pop.BatchAckMsg;
public class AckMessageProcessor implements NettyRequestProcessor {
private static final Logger POP_LOGGER =
LoggerFactory.getLogger(LoggerName.ROCKETMQ_POP_LOGGER_NAME);
private final BrokerController brokerController;
- private String reviveTopic;
- private PopReviveService[] popReviveServices;
+ private final String reviveTopic;
+ private final PopReviveService[] popReviveServices;
public AckMessageProcessor(final BrokerController brokerController) {
this.brokerController = brokerController;
@@ -93,7 +97,7 @@ public class AckMessageProcessor implements
NettyRequestProcessor {
@Override
public RemotingCommand processRequest(final ChannelHandlerContext ctx,
- RemotingCommand request) throws RemotingCommandException {
+ RemotingCommand request) throws
RemotingCommandException {
return this.processRequest(ctx.channel(), request, true);
}
@@ -103,135 +107,209 @@ public class AckMessageProcessor implements
NettyRequestProcessor {
}
private RemotingCommand processRequest(final Channel channel,
RemotingCommand request,
- boolean brokerAllowSuspend) throws RemotingCommandException {
- final AckMessageRequestHeader requestHeader =
(AckMessageRequestHeader)
request.decodeCommandCustomHeader(AckMessageRequestHeader.class);
- MessageExtBrokerInner msgInner = new MessageExtBrokerInner();
- AckMsg ackMsg = new AckMsg();
- RemotingCommand response =
RemotingCommand.createResponseCommand(ResponseCode.SUCCESS, null);
+ boolean brokerAllowSuspend) throws
RemotingCommandException {
+ AckMessageRequestHeader requestHeader;
+ BatchAckMessageRequestBody reqBody = null;
+ final RemotingCommand response =
RemotingCommand.createResponseCommand(ResponseCode.SUCCESS, null);
response.setOpaque(request.getOpaque());
- TopicConfig topicConfig =
this.brokerController.getTopicConfigManager().selectTopicConfig(requestHeader.getTopic());
- if (null == topicConfig) {
- POP_LOGGER.error("The topic {} not exist, consumer: {} ",
requestHeader.getTopic(), RemotingHelper.parseChannelRemoteAddr(channel));
- response.setCode(ResponseCode.TOPIC_NOT_EXIST);
- response.setRemark(String.format("topic[%s] not exist, apply first
please! %s", requestHeader.getTopic(),
FAQUrl.suggestTodo(FAQUrl.APPLY_TOPIC_URL)));
- return response;
- }
+ if (request.getCode() == RequestCode.ACK_MESSAGE) {
+ requestHeader = (AckMessageRequestHeader)
request.decodeCommandCustomHeader(AckMessageRequestHeader.class);
- if (requestHeader.getQueueId() >= topicConfig.getReadQueueNums() ||
requestHeader.getQueueId() < 0) {
- String errorInfo = String.format("queueId[%d] is illegal,
topic:[%s] topicConfig.readQueueNums:[%d] consumer:[%s]",
- requestHeader.getQueueId(), requestHeader.getTopic(),
topicConfig.getReadQueueNums(), channel.remoteAddress());
- POP_LOGGER.warn(errorInfo);
- response.setCode(ResponseCode.MESSAGE_ILLEGAL);
- response.setRemark(errorInfo);
- return response;
- }
- long minOffset =
this.brokerController.getMessageStore().getMinOffsetInQueue(requestHeader.getTopic(),
requestHeader.getQueueId());
- long maxOffset =
this.brokerController.getMessageStore().getMaxOffsetInQueue(requestHeader.getTopic(),
requestHeader.getQueueId());
- if (requestHeader.getOffset() < minOffset || requestHeader.getOffset()
> maxOffset) {
- String errorInfo = String.format("offset is illegal, key:%s@%d,
commit:%d, store:%d~%d",
- requestHeader.getTopic(), requestHeader.getQueueId(),
requestHeader.getOffset(), minOffset, maxOffset);
- POP_LOGGER.warn(errorInfo);
- response.setCode(ResponseCode.NO_MESSAGE);
- response.setRemark(errorInfo);
- return response;
- }
- String[] extraInfo = ExtraInfoUtil.split(requestHeader.getExtraInfo());
-
- ackMsg.setAckOffset(requestHeader.getOffset());
- ackMsg.setStartOffset(ExtraInfoUtil.getCkQueueOffset(extraInfo));
- ackMsg.setConsumerGroup(requestHeader.getConsumerGroup());
- ackMsg.setTopic(requestHeader.getTopic());
- ackMsg.setQueueId(requestHeader.getQueueId());
- ackMsg.setPopTime(ExtraInfoUtil.getPopTime(extraInfo));
- ackMsg.setBrokerName(ExtraInfoUtil.getBrokerName(extraInfo));
-
- int rqId = ExtraInfoUtil.getReviveQid(extraInfo);
- long invisibleTime = ExtraInfoUtil.getInvisibleTime(extraInfo);
-
- this.brokerController.getBrokerStatsManager().incBrokerAckNums(1);
-
this.brokerController.getBrokerStatsManager().incGroupAckNums(requestHeader.getConsumerGroup(),
requestHeader.getTopic(), 1);
-
- if (rqId == KeyBuilder.POP_ORDER_REVIVE_QUEUE) {
- // order
- String lockKey = requestHeader.getTopic() + PopAckConstants.SPLIT
- + requestHeader.getConsumerGroup() + PopAckConstants.SPLIT +
requestHeader.getQueueId();
- long oldOffset =
this.brokerController.getConsumerOffsetManager().queryOffset(requestHeader.getConsumerGroup(),
- requestHeader.getTopic(), requestHeader.getQueueId());
- if (requestHeader.getOffset() < oldOffset) {
+ TopicConfig topicConfig =
this.brokerController.getTopicConfigManager().selectTopicConfig(requestHeader.getTopic());
+ if (null == topicConfig) {
+ POP_LOGGER.error("The topic {} not exist, consumer: {} ",
requestHeader.getTopic(), RemotingHelper.parseChannelRemoteAddr(channel));
+ response.setCode(ResponseCode.TOPIC_NOT_EXIST);
+ response.setRemark(String.format("topic[%s] not exist, apply
first please! %s", requestHeader.getTopic(),
FAQUrl.suggestTodo(FAQUrl.APPLY_TOPIC_URL)));
return response;
}
- while
(!this.brokerController.getPopMessageProcessor().getQueueLockManager().tryLock(lockKey))
{
+
+ if (requestHeader.getQueueId() >= topicConfig.getReadQueueNums()
|| requestHeader.getQueueId() < 0) {
+ String errorInfo = String.format("queueId[%d] is illegal,
topic:[%s] topicConfig.readQueueNums:[%d] consumer:[%s]",
+ requestHeader.getQueueId(), requestHeader.getTopic(),
topicConfig.getReadQueueNums(), channel.remoteAddress());
+ POP_LOGGER.warn(errorInfo);
+ response.setCode(ResponseCode.MESSAGE_ILLEGAL);
+ response.setRemark(errorInfo);
+ return response;
}
- try {
- oldOffset =
this.brokerController.getConsumerOffsetManager().queryOffset(requestHeader.getConsumerGroup(),
- requestHeader.getTopic(), requestHeader.getQueueId());
- if (requestHeader.getOffset() < oldOffset) {
- return response;
+
+ long minOffset =
this.brokerController.getMessageStore().getMinOffsetInQueue(requestHeader.getTopic(),
requestHeader.getQueueId());
+ long maxOffset =
this.brokerController.getMessageStore().getMaxOffsetInQueue(requestHeader.getTopic(),
requestHeader.getQueueId());
+ if (requestHeader.getOffset() < minOffset ||
requestHeader.getOffset() > maxOffset) {
+ String errorInfo = String.format("offset is illegal,
key:%s@%d, commit:%d, store:%d~%d",
+ requestHeader.getTopic(), requestHeader.getQueueId(),
requestHeader.getOffset(), minOffset, maxOffset);
+ POP_LOGGER.warn(errorInfo);
+ response.setCode(ResponseCode.NO_MESSAGE);
+ response.setRemark(errorInfo);
+ return response;
+ }
+
+ appendAck(requestHeader, null, response, channel, null);
+ } else if (request.getCode() == RequestCode.BATCH_ACK_MESSAGE) {
+ if (request.getBody() != null) {
+ reqBody = BatchAckMessageRequestBody.decode(request.getBody(),
BatchAckMessageRequestBody.class);
+ }
+ if (reqBody == null || reqBody.getAcks() == null ||
reqBody.getAcks().isEmpty()) {
+ response.setCode(ResponseCode.NO_MESSAGE);
+ return response;
+ }
+ for (BatchAck bAck : reqBody.getAcks()) {
+ appendAck(null, bAck, response, channel,
reqBody.getBrokerName());
+ }
+ } else {
+ POP_LOGGER.error("AckMessageProcessor failed to process
RequestCode: {}, consumer: {} ", request.getCode(),
RemotingHelper.parseChannelRemoteAddr(channel));
+ response.setCode(ResponseCode.MESSAGE_ILLEGAL);
+ response.setRemark(String.format("AckMessageProcessor failed to
process RequestCode: %d", request.getCode()));
+ return response;
+ }
+ return response;
+ }
+
+ private void appendAck(final AckMessageRequestHeader requestHeader, final
BatchAck batchAck, final RemotingCommand response, final Channel channel,
String brokerName) {
+ String[] extraInfo;
+ String consumeGroup, topic;
+ int qId, rqId;
+ long startOffset, ackOffset;
+ long popTime, invisibleTime;
+ AckMsg ackMsg;
+ int ackCount = 0;
+ if (batchAck == null) {
+ // single ack
+ extraInfo = ExtraInfoUtil.split(requestHeader.getExtraInfo());
+ brokerName = ExtraInfoUtil.getBrokerName(extraInfo);
+ consumeGroup = requestHeader.getConsumerGroup();
+ topic = requestHeader.getTopic();
+ qId = requestHeader.getQueueId();
+ rqId = ExtraInfoUtil.getReviveQid(extraInfo);
+ startOffset = ExtraInfoUtil.getCkQueueOffset(extraInfo);
+ ackOffset = requestHeader.getOffset();
+ popTime = ExtraInfoUtil.getPopTime(extraInfo);
+ invisibleTime = ExtraInfoUtil.getInvisibleTime(extraInfo);
+
+ if (rqId == KeyBuilder.POP_ORDER_REVIVE_QUEUE) {
+ // order
+ String lockKey = topic + PopAckConstants.SPLIT + consumeGroup
+ PopAckConstants.SPLIT + qId;
+ long oldOffset =
this.brokerController.getConsumerOffsetManager().queryOffset(consumeGroup,
topic, qId);
+ if (ackOffset < oldOffset) {
+ return;
+ }
+ while
(!this.brokerController.getPopMessageProcessor().getQueueLockManager().tryLock(lockKey))
{
}
- long nextOffset =
brokerController.getConsumerOrderInfoManager().commitAndNext(
- requestHeader.getTopic(), requestHeader.getConsumerGroup(),
- requestHeader.getQueueId(), requestHeader.getOffset(),
- ExtraInfoUtil.getPopTime(extraInfo));
- if (nextOffset > -1) {
- if
(!this.brokerController.getConsumerOffsetManager().hasOffsetReset(
- requestHeader.getTopic(),
requestHeader.getConsumerGroup(), requestHeader.getQueueId())) {
-
this.brokerController.getConsumerOffsetManager().commitOffset(channel.remoteAddress().toString(),
- requestHeader.getConsumerGroup(),
requestHeader.getTopic(), requestHeader.getQueueId(), nextOffset);
+ try {
+ oldOffset =
this.brokerController.getConsumerOffsetManager().queryOffset(consumeGroup,
topic, qId);
+ if (ackOffset < oldOffset) {
+ return;
}
- if
(!this.brokerController.getConsumerOrderInfoManager().checkBlock(null,
requestHeader.getTopic(),
- requestHeader.getConsumerGroup(),
requestHeader.getQueueId(), invisibleTime)) {
-
this.brokerController.getPopMessageProcessor().notifyMessageArriving(
- requestHeader.getTopic(),
requestHeader.getConsumerGroup(), requestHeader.getQueueId());
+ long nextOffset =
brokerController.getConsumerOrderInfoManager().commitAndNext(
+ topic, consumeGroup,
+ qId, ackOffset,
+ popTime);
+ if (nextOffset > -1) {
+ if
(!this.brokerController.getConsumerOffsetManager().hasOffsetReset(
+ topic, consumeGroup, qId)) {
+
this.brokerController.getConsumerOffsetManager().commitOffset(channel.remoteAddress().toString(),
+ consumeGroup, topic, qId, nextOffset);
+ }
+ if
(!this.brokerController.getConsumerOrderInfoManager().checkBlock(null, topic,
+ consumeGroup, qId, invisibleTime)) {
+
this.brokerController.getPopMessageProcessor().notifyMessageArriving(
+ topic, consumeGroup, qId);
+ }
+ } else if (nextOffset == -1) {
+ String errorInfo = String.format("offset is illegal,
key:%s, old:%d, commit:%d, next:%d, %s",
+ lockKey, oldOffset, ackOffset, nextOffset,
channel.remoteAddress());
+ POP_LOGGER.warn(errorInfo);
+ response.setCode(ResponseCode.MESSAGE_ILLEGAL);
+ response.setRemark(errorInfo);
+ return;
}
- } else if (nextOffset == -1) {
- String errorInfo = String.format("offset is illegal,
key:%s, old:%d, commit:%d, next:%d, %s",
- lockKey, oldOffset, requestHeader.getOffset(),
nextOffset, channel.remoteAddress());
- POP_LOGGER.warn(errorInfo);
- response.setCode(ResponseCode.MESSAGE_ILLEGAL);
- response.setRemark(errorInfo);
- return response;
+ } finally {
+
this.brokerController.getPopMessageProcessor().getQueueLockManager().unLock(lockKey);
}
- } finally {
-
this.brokerController.getPopMessageProcessor().getQueueLockManager().unLock(lockKey);
+
brokerController.getPopInflightMessageCounter().decrementInFlightMessageNum(topic,
consumeGroup, popTime, qId, ackCount);
+ return;
}
- decInFlightMessageNum(requestHeader);
- return response;
+
+ ackMsg = new AckMsg();
+ ackCount = 1;
+ } else {
+ // batch ack
+ consumeGroup = batchAck.getConsumerGroup();
+ topic = ExtraInfoUtil.getRealTopic(batchAck.getTopic(),
batchAck.getConsumerGroup(),
ExtraInfoUtil.RETRY_TOPIC.equals(batchAck.getRetry()));
+ qId = batchAck.getQueueId();
+ rqId = batchAck.getReviveQueueId();
+ startOffset = batchAck.getStartOffset();
+ ackOffset = -1;
+ popTime = batchAck.getPopTime();
+ invisibleTime = batchAck.getInvisibleTime();
+
+ long minOffset =
this.brokerController.getMessageStore().getMinOffsetInQueue(topic, qId);
+ long maxOffset =
this.brokerController.getMessageStore().getMaxOffsetInQueue(topic, qId);
+ if (minOffset == -1 || maxOffset == -1) {
+ POP_LOGGER.error("Illegal topic or queue found when batch ack
{}", batchAck);
+ return;
+ }
+
+ BatchAckMsg batchAckMsg = new BatchAckMsg();
+ for (int i = 0; batchAck.getBitSet() != null && i <
batchAck.getBitSet().length(); i++) {
+ if (!batchAck.getBitSet().get(i)) {
+ continue;
+ }
+ long offset = startOffset + i;
+ if (offset < minOffset || offset > maxOffset) {
+ continue;
+ }
+ batchAckMsg.getAckOffsetList().add(offset);
+ }
+ if (batchAckMsg.getAckOffsetList().isEmpty()) {
+ return;
+ }
+
+ ackMsg = batchAckMsg;
+ ackCount = batchAckMsg.getAckOffsetList().size();
}
+
this.brokerController.getBrokerStatsManager().incBrokerAckNums(ackCount);
+
this.brokerController.getBrokerStatsManager().incGroupAckNums(consumeGroup,
topic, ackCount);
+
+ ackMsg.setConsumerGroup(consumeGroup);
+ ackMsg.setTopic(topic);
+ ackMsg.setQueueId(qId);
+ ackMsg.setStartOffset(startOffset);
+ ackMsg.setAckOffset(ackOffset);
+ ackMsg.setPopTime(popTime);
+ ackMsg.setBrokerName(brokerName);
+
if
(this.brokerController.getPopMessageProcessor().getPopBufferMergeService().addAk(rqId,
ackMsg)) {
- decInFlightMessageNum(requestHeader);
- return response;
+
brokerController.getPopInflightMessageCounter().decrementInFlightMessageNum(topic,
consumeGroup, popTime, qId, ackCount);
+ return;
}
+ MessageExtBrokerInner msgInner = new MessageExtBrokerInner();
msgInner.setTopic(reviveTopic);
msgInner.setBody(JSON.toJSONString(ackMsg).getBytes(DataConverter.charset));
- //msgInner.setQueueId(Integer.valueOf(extraInfo[3]));
msgInner.setQueueId(rqId);
- msgInner.setTags(PopAckConstants.ACK_TAG);
+ if (ackMsg instanceof BatchAckMsg) {
+ msgInner.setTags(PopAckConstants.BATCH_ACK_TAG);
+
msgInner.getProperties().put(MessageConst.PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX,
PopMessageProcessor.genBatchAckUniqueId((BatchAckMsg) ackMsg));
+ } else {
+ msgInner.setTags(PopAckConstants.ACK_TAG);
+
msgInner.getProperties().put(MessageConst.PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX,
PopMessageProcessor.genAckUniqueId(ackMsg));
+ }
msgInner.setBornTimestamp(System.currentTimeMillis());
msgInner.setBornHost(this.brokerController.getStoreHost());
msgInner.setStoreHost(this.brokerController.getStoreHost());
- msgInner.setDeliverTimeMs(ExtraInfoUtil.getPopTime(extraInfo) +
invisibleTime);
+ msgInner.setDeliverTimeMs(popTime + invisibleTime);
msgInner.getProperties().put(MessageConst.PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX,
PopMessageProcessor.genAckUniqueId(ackMsg));
msgInner.setPropertiesString(MessageDecoder.messageProperties2String(msgInner.getProperties()));
PutMessageResult putMessageResult =
this.brokerController.getEscapeBridge().putMessageToSpecificQueue(msgInner);
if (putMessageResult.getPutMessageStatus() != PutMessageStatus.PUT_OK
- && putMessageResult.getPutMessageStatus() !=
PutMessageStatus.FLUSH_DISK_TIMEOUT
- && putMessageResult.getPutMessageStatus() !=
PutMessageStatus.FLUSH_SLAVE_TIMEOUT
- && putMessageResult.getPutMessageStatus() !=
PutMessageStatus.SLAVE_NOT_AVAILABLE) {
+ && putMessageResult.getPutMessageStatus() !=
PutMessageStatus.FLUSH_DISK_TIMEOUT
+ && putMessageResult.getPutMessageStatus() !=
PutMessageStatus.FLUSH_SLAVE_TIMEOUT
+ && putMessageResult.getPutMessageStatus() !=
PutMessageStatus.SLAVE_NOT_AVAILABLE) {
POP_LOGGER.error("put ack msg error:" + putMessageResult);
}
+ System.out.printf("put ack to store %s", ackMsg);
PopMetricsManager.incPopReviveAckPutCount(ackMsg,
putMessageResult.getPutMessageStatus());
- decInFlightMessageNum(requestHeader);
- return response;
+
brokerController.getPopInflightMessageCounter().decrementInFlightMessageNum(topic,
consumeGroup, popTime, qId, ackCount);
}
-
- private void decInFlightMessageNum(AckMessageRequestHeader requestHeader) {
-
this.brokerController.getPopInflightMessageCounter().decrementInFlightMessageNum(
- requestHeader.getTopic(),
- requestHeader.getConsumerGroup(),
- requestHeader.getExtraInfo()
- );
- }
-
}
diff --git
a/broker/src/main/java/org/apache/rocketmq/broker/processor/PopBufferMergeService.java
b/broker/src/main/java/org/apache/rocketmq/broker/processor/PopBufferMergeService.java
index c5889f556..d7bc7c694 100644
---
a/broker/src/main/java/org/apache/rocketmq/broker/processor/PopBufferMergeService.java
+++
b/broker/src/main/java/org/apache/rocketmq/broker/processor/PopBufferMergeService.java
@@ -538,12 +538,23 @@ public class PopBufferMergeService extends ServiceThread {
return false;
}
- int indexOfAck = point.indexOfAck(ackMsg.getAckOffset());
- if (indexOfAck > -1) {
- markBitCAS(pointWrapper.getBits(), indexOfAck);
+ if (ackMsg instanceof BatchAckMsg) {
+ for (Long ackOffset : ((BatchAckMsg)
ackMsg).getAckOffsetList()) {
+ int indexOfAck = point.indexOfAck(ackOffset);
+ if (indexOfAck > -1) {
+ markBitCAS(pointWrapper.getBits(), indexOfAck);
+ } else {
+ POP_LOGGER.error("[PopBuffer]Invalid index of ack,
reviveQid={}, {}, {}", reviveQid, ackMsg, point);
+ }
+ }
} else {
- POP_LOGGER.error("[PopBuffer]Invalid index of ack,
reviveQid={}, {}, {}", reviveQid, ackMsg, point);
- return true;
+ int indexOfAck = point.indexOfAck(ackMsg.getAckOffset());
+ if (indexOfAck > -1) {
+ markBitCAS(pointWrapper.getBits(), indexOfAck);
+ } else {
+ POP_LOGGER.error("[PopBuffer]Invalid index of ack,
reviveQid={}, {}, {}", reviveQid, ackMsg, point);
+ return true;
+ }
}
if (brokerController.getBrokerConfig().isEnablePopLog()) {
diff --git
a/broker/src/main/java/org/apache/rocketmq/broker/processor/PopInflightMessageCounter.java
b/broker/src/main/java/org/apache/rocketmq/broker/processor/PopInflightMessageCounter.java
index 584cc54ba..6749af3d7 100644
---
a/broker/src/main/java/org/apache/rocketmq/broker/processor/PopInflightMessageCounter.java
+++
b/broker/src/main/java/org/apache/rocketmq/broker/processor/PopInflightMessageCounter.java
@@ -16,18 +16,18 @@
*/
package org.apache.rocketmq.broker.processor;
-import java.util.Map;
-import java.util.Set;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.atomic.AtomicLong;
import org.apache.rocketmq.broker.BrokerController;
import org.apache.rocketmq.common.Pair;
import org.apache.rocketmq.common.constant.LoggerName;
import org.apache.rocketmq.logging.org.slf4j.Logger;
import org.apache.rocketmq.logging.org.slf4j.LoggerFactory;
-import org.apache.rocketmq.remoting.protocol.header.ExtraInfoUtil;
import org.apache.rocketmq.store.pop.PopCheckPoint;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicLong;
+
public class PopInflightMessageCounter {
private static final Logger log =
LoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME);
@@ -61,26 +61,24 @@ public class PopInflightMessageCounter {
});
}
- public void decrementInFlightMessageNum(String topic, String group, String
ckInfo) {
- String[] ckInfoList = ExtraInfoUtil.split(ckInfo);
- long popTime = ExtraInfoUtil.getPopTime(ckInfoList);
+ public void decrementInFlightMessageNum(String topic, String group, long
popTime, int qId, int delta) {
if (popTime < this.brokerController.getShouldStartTime()) {
return;
}
- decrementInFlightMessageNum(topic, group,
ExtraInfoUtil.getQueueId(ckInfoList));
+ decrementInFlightMessageNum(topic, group, qId, delta);
}
public void decrementInFlightMessageNum(PopCheckPoint checkPoint) {
if (checkPoint.getPopTime() <
this.brokerController.getShouldStartTime()) {
return;
}
- decrementInFlightMessageNum(checkPoint.getTopic(),
checkPoint.getCId(), checkPoint.getQueueId());
+ decrementInFlightMessageNum(checkPoint.getTopic(),
checkPoint.getCId(), checkPoint.getQueueId(), 1);
}
- public void decrementInFlightMessageNum(String topic, String group, int
queueId) {
+ private void decrementInFlightMessageNum(String topic, String group, int
queueId, int delta) {
topicInFlightMessageNum.computeIfPresent(buildKey(topic, group), (key,
queueNum) -> {
queueNum.computeIfPresent(queueId, (queueIdKey, counter) -> {
- if (counter.decrementAndGet() <= 0) {
+ if (counter.addAndGet(-delta) <= 0) {
return null;
}
return counter;
diff --git
a/broker/src/test/java/org/apache/rocketmq/broker/processor/AckMessageProcessorTest.java
b/broker/src/test/java/org/apache/rocketmq/broker/processor/AckMessageProcessorTest.java
index 6719df08f..c0afb46c3 100644
---
a/broker/src/test/java/org/apache/rocketmq/broker/processor/AckMessageProcessorTest.java
+++
b/broker/src/test/java/org/apache/rocketmq/broker/processor/AckMessageProcessorTest.java
@@ -18,12 +18,12 @@ package org.apache.rocketmq.broker.processor;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext;
-import java.lang.reflect.Field;
import org.apache.rocketmq.broker.BrokerController;
import org.apache.rocketmq.broker.client.ClientChannelInfo;
import org.apache.rocketmq.broker.client.net.Broker2Client;
import org.apache.rocketmq.broker.failover.EscapeBridge;
import org.apache.rocketmq.common.BrokerConfig;
+import org.apache.rocketmq.common.MixAll;
import org.apache.rocketmq.common.TopicConfig;
import org.apache.rocketmq.common.message.MessageConst;
import org.apache.rocketmq.common.message.MessageExtBrokerInner;
@@ -36,6 +36,8 @@ import org.apache.rocketmq.remoting.protocol.LanguageCode;
import org.apache.rocketmq.remoting.protocol.RemotingCommand;
import org.apache.rocketmq.remoting.protocol.RequestCode;
import org.apache.rocketmq.remoting.protocol.ResponseCode;
+import org.apache.rocketmq.remoting.protocol.body.BatchAck;
+import org.apache.rocketmq.remoting.protocol.body.BatchAckMessageRequestBody;
import org.apache.rocketmq.remoting.protocol.header.AckMessageRequestHeader;
import org.apache.rocketmq.remoting.protocol.header.ExtraInfoUtil;
import org.apache.rocketmq.remoting.protocol.heartbeat.ConsumerData;
@@ -53,15 +55,25 @@ import org.mockito.Mockito;
import org.mockito.Spy;
import org.mockito.junit.MockitoJUnitRunner;
+import java.lang.reflect.Field;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.BitSet;
+import java.util.Collections;
+
import static
org.apache.rocketmq.broker.processor.PullMessageProcessorTest.createConsumerData;
import static org.assertj.core.api.Assertions.assertThat;
import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyInt;
+import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
@RunWith(MockitoJUnitRunner.class)
public class AckMessageProcessorTest {
private AckMessageProcessor ackMessageProcessor;
+ @Mock
+ private PopMessageProcessor popMessageProcessor;
@Spy
private BrokerController brokerController = new BrokerController(new
BrokerConfig(), new NettyServerConfig(), new NettyClientConfig(), new
MessageStoreConfig());
@Mock
@@ -77,6 +89,9 @@ public class AckMessageProcessorTest {
@Mock
private Broker2Client broker2Client;
+ private static final long MIN_OFFSET_IN_QUEUE = 100;
+ private static final long MAX_OFFSET_IN_QUEUE = 999;
+
@Before
public void init() throws IllegalAccessException, NoSuchFieldException {
clientInfo = new ClientChannelInfo(channel, "127.0.0.1",
LanguageCode.JAVA, 0);
@@ -91,19 +106,27 @@ public class AckMessageProcessorTest {
brokerController.getTopicConfigManager().getTopicConfigTable().put(topic, new
TopicConfig());
ConsumerData consumerData = createConsumerData(group, topic);
brokerController.getConsumerManager().registerConsumer(
- consumerData.getGroupName(),
- clientInfo,
- consumerData.getConsumeType(),
- consumerData.getMessageModel(),
- consumerData.getConsumeFromWhere(),
- consumerData.getSubscriptionDataSet(),
- false);
+ consumerData.getGroupName(),
+ clientInfo,
+ consumerData.getConsumeType(),
+ consumerData.getMessageModel(),
+ consumerData.getConsumeFromWhere(),
+ consumerData.getSubscriptionDataSet(),
+ false);
ackMessageProcessor = new AckMessageProcessor(brokerController);
+
+ when(messageStore.getMinOffsetInQueue(anyString(),
anyInt())).thenReturn(MIN_OFFSET_IN_QUEUE);
+ when(messageStore.getMaxOffsetInQueue(anyString(),
anyInt())).thenReturn(MAX_OFFSET_IN_QUEUE);
+
+
when(brokerController.getPopMessageProcessor()).thenReturn(popMessageProcessor);
}
@Test
public void testProcessRequest_Success() throws RemotingCommandException,
InterruptedException, RemotingTimeoutException, RemotingSendRequestException {
when(messageStore.putMessage(any(MessageExtBrokerInner.class))).thenReturn(new
PutMessageResult(PutMessageStatus.PUT_OK, new
AppendMessageResult(AppendMessageStatus.PUT_OK)));
+ PopBufferMergeService popBufferMergeService =
mock(PopBufferMergeService.class);
+ when(popBufferMergeService.addAk(anyInt(), any())).thenReturn(false);
+
when(popMessageProcessor.getPopBufferMergeService()).thenReturn(popBufferMergeService);
int queueId = 0;
long queueOffset = 0;
@@ -112,11 +135,11 @@ public class AckMessageProcessorTest {
int reviveQid = 0;
String brokerName = "test_broker";
String extraInfo = ExtraInfoUtil.buildExtraInfo(queueOffset, popTime,
invisibleTime, reviveQid,
- topic, brokerName, queueId) + MessageConst.KEY_SEPARATOR +
queueOffset;
+ topic, brokerName, queueId) + MessageConst.KEY_SEPARATOR +
queueOffset;
AckMessageRequestHeader requestHeader = new AckMessageRequestHeader();
requestHeader.setTopic(topic);
requestHeader.setQueueId(0);
- requestHeader.setOffset(0L);
+ requestHeader.setOffset(MIN_OFFSET_IN_QUEUE + 1);
requestHeader.setConsumerGroup(group);
requestHeader.setExtraInfo(extraInfo);
@@ -126,4 +149,213 @@ public class AckMessageProcessorTest {
assertThat(responseToReturn.getCode()).isEqualTo(ResponseCode.SUCCESS);
assertThat(responseToReturn.getOpaque()).isEqualTo(request.getOpaque());
}
+
+ @Test
+ public void testProcessRequest_WrongRequestCode() throws Exception {
+ RemotingCommand request =
RemotingCommand.createRequestCommand(RequestCode.SEND_MESSAGE, null);
+ RemotingCommand response =
ackMessageProcessor.processRequest(handlerContext, request);
+ assertThat(response.getCode()).isEqualTo(ResponseCode.MESSAGE_ILLEGAL);
+ assertThat(response.getRemark()).isEqualTo("AckMessageProcessor failed
to process RequestCode: " + RequestCode.SEND_MESSAGE);
+ }
+
+ @Test
+ public void testSingleAck_TopicCheck() throws RemotingCommandException {
+ AckMessageRequestHeader requestHeader = new AckMessageRequestHeader();
+ requestHeader.setTopic("wrongTopic");
+ requestHeader.setQueueId(0);
+ requestHeader.setOffset(0L);
+ RemotingCommand request =
RemotingCommand.createRequestCommand(RequestCode.ACK_MESSAGE, requestHeader);
+ request.makeCustomHeaderToNet();
+ RemotingCommand response =
ackMessageProcessor.processRequest(handlerContext, request);
+ assertThat(response.getCode()).isEqualTo(ResponseCode.TOPIC_NOT_EXIST);
+ assertThat(response.getRemark()).contains("not exist, apply first");
+ }
+
+ @Test
+ public void testSingleAck_QueueCheck() throws RemotingCommandException {
+ {
+ int qId = -1;
+ AckMessageRequestHeader requestHeader = new
AckMessageRequestHeader();
+ requestHeader.setTopic(topic);
+ requestHeader.setQueueId(qId);
+ requestHeader.setOffset(0L);
+ RemotingCommand request =
RemotingCommand.createRequestCommand(RequestCode.ACK_MESSAGE, requestHeader);
+ request.makeCustomHeaderToNet();
+ RemotingCommand response =
ackMessageProcessor.processRequest(handlerContext, request);
+
assertThat(response.getCode()).isEqualTo(ResponseCode.MESSAGE_ILLEGAL);
+ assertThat(response.getRemark()).contains("queueId[" + qId + "] is
illegal");
+ }
+
+ {
+ int qId = 17;
+ AckMessageRequestHeader requestHeader = new
AckMessageRequestHeader();
+ requestHeader.setTopic(topic);
+ requestHeader.setQueueId(qId);
+ requestHeader.setOffset(0L);
+ RemotingCommand request =
RemotingCommand.createRequestCommand(RequestCode.ACK_MESSAGE, requestHeader);
+ request.makeCustomHeaderToNet();
+ RemotingCommand response =
ackMessageProcessor.processRequest(handlerContext, request);
+
assertThat(response.getCode()).isEqualTo(ResponseCode.MESSAGE_ILLEGAL);
+ assertThat(response.getRemark()).contains("queueId[" + qId + "] is
illegal");
+ }
+ }
+
+ @Test
+ public void testSingleAck_OffsetCheck() throws RemotingCommandException {
+ {
+ AckMessageRequestHeader requestHeader = new
AckMessageRequestHeader();
+ requestHeader.setTopic(topic);
+ requestHeader.setQueueId(0);
+ requestHeader.setOffset(MIN_OFFSET_IN_QUEUE - 1);
+ //requestHeader.setOffset(maxOffsetInQueue + 1);
+ RemotingCommand request =
RemotingCommand.createRequestCommand(RequestCode.ACK_MESSAGE, requestHeader);
+ request.makeCustomHeaderToNet();
+ RemotingCommand response =
ackMessageProcessor.processRequest(handlerContext, request);
+ assertThat(response.getCode()).isEqualTo(ResponseCode.NO_MESSAGE);
+ assertThat(response.getRemark()).contains("offset is illegal");
+ }
+
+ {
+ AckMessageRequestHeader requestHeader = new
AckMessageRequestHeader();
+ requestHeader.setTopic(topic);
+ requestHeader.setQueueId(0);
+ //requestHeader.setOffset(minOffsetInQueue - 1);
+ requestHeader.setOffset(MAX_OFFSET_IN_QUEUE + 1);
+ RemotingCommand request =
RemotingCommand.createRequestCommand(RequestCode.ACK_MESSAGE, requestHeader);
+ request.makeCustomHeaderToNet();
+ RemotingCommand response =
ackMessageProcessor.processRequest(handlerContext, request);
+ assertThat(response.getCode()).isEqualTo(ResponseCode.NO_MESSAGE);
+ assertThat(response.getRemark()).contains("offset is illegal");
+ }
+ }
+
+ @Test
+ public void testBatchAck_NoMessage() throws RemotingCommandException {
+ {
+ //reqBody == null
+ RemotingCommand request =
RemotingCommand.createRequestCommand(RequestCode.BATCH_ACK_MESSAGE, null);
+ RemotingCommand response =
ackMessageProcessor.processRequest(handlerContext, request);
+ assertThat(response.getCode()).isEqualTo(ResponseCode.NO_MESSAGE);
+ }
+
+ {
+ //reqBody.getAcks() == null
+ RemotingCommand request =
RemotingCommand.createRequestCommand(RequestCode.BATCH_ACK_MESSAGE, null);
+ BatchAckMessageRequestBody reqBody = new
BatchAckMessageRequestBody();
+ request.setBody(reqBody.encode());
+ RemotingCommand response =
ackMessageProcessor.processRequest(handlerContext, request);
+ assertThat(response.getCode()).isEqualTo(ResponseCode.NO_MESSAGE);
+ }
+
+ {
+ //reqBody.getAcks().isEmpty()
+ RemotingCommand request =
RemotingCommand.createRequestCommand(RequestCode.BATCH_ACK_MESSAGE, null);
+ BatchAckMessageRequestBody reqBody = new
BatchAckMessageRequestBody();
+ reqBody.setAcks(new ArrayList<>());
+ request.setBody(reqBody.encode());
+ RemotingCommand response =
ackMessageProcessor.processRequest(handlerContext, request);
+ assertThat(response.getCode()).isEqualTo(ResponseCode.NO_MESSAGE);
+ }
+ }
+
+ @Test
+ public void testSingleAck_appendAck() throws RemotingCommandException {
+ {
+ // buffer addAk OK
+ PopBufferMergeService popBufferMergeService =
mock(PopBufferMergeService.class);
+ when(popBufferMergeService.addAk(anyInt(),
any())).thenReturn(true);
+
when(popMessageProcessor.getPopBufferMergeService()).thenReturn(popBufferMergeService);
+
+ AckMessageRequestHeader requestHeader = new
AckMessageRequestHeader();
+ long ackOffset = MIN_OFFSET_IN_QUEUE + 10;
+ requestHeader.setTopic(topic);
+ requestHeader.setQueueId(0);
+ requestHeader.setOffset(ackOffset);
+ requestHeader.setConsumerGroup(MixAll.DEFAULT_CONSUMER_GROUP);
+ requestHeader.setExtraInfo("64 1666860736757 60000 4 0 broker-a 0
" + ackOffset);
+ RemotingCommand request =
RemotingCommand.createRequestCommand(RequestCode.ACK_MESSAGE, requestHeader);
+ request.makeCustomHeaderToNet();
+ RemotingCommand response =
ackMessageProcessor.processRequest(handlerContext, request);
+
+ assertThat(response.getCode()).isEqualTo(ResponseCode.SUCCESS);
+ }
+
+ {
+ // buffer addAk fail
+ PopBufferMergeService popBufferMergeService =
mock(PopBufferMergeService.class);
+ when(popBufferMergeService.addAk(anyInt(),
any())).thenReturn(false);
+
when(popMessageProcessor.getPopBufferMergeService()).thenReturn(popBufferMergeService);
+ // store putMessage OK
+ PutMessageResult putMessageResult = new
PutMessageResult(PutMessageStatus.PUT_OK, null);
+ when(messageStore.putMessage(any())).thenReturn(putMessageResult);
+
+ AckMessageRequestHeader requestHeader = new
AckMessageRequestHeader();
+ long ackOffset = MIN_OFFSET_IN_QUEUE + 10;
+ requestHeader.setTopic(topic);
+ requestHeader.setQueueId(0);
+ requestHeader.setOffset(ackOffset);
+ requestHeader.setConsumerGroup(MixAll.DEFAULT_CONSUMER_GROUP);
+ requestHeader.setExtraInfo("64 1666860736757 60000 4 0 broker-a 0
" + ackOffset);
+ RemotingCommand request =
RemotingCommand.createRequestCommand(RequestCode.ACK_MESSAGE, requestHeader);
+ request.makeCustomHeaderToNet();
+ RemotingCommand response =
ackMessageProcessor.processRequest(handlerContext, request);
+
+ assertThat(response.getCode()).isEqualTo(ResponseCode.SUCCESS);
+ }
+ }
+
+ @Test
+ public void testBatchAck_appendAck() throws RemotingCommandException {
+ {
+ // buffer addAk OK
+ PopBufferMergeService popBufferMergeService =
mock(PopBufferMergeService.class);
+ when(popBufferMergeService.addAk(anyInt(),
any())).thenReturn(true);
+
when(popMessageProcessor.getPopBufferMergeService()).thenReturn(popBufferMergeService);
+
+ BatchAck bAck1 = new BatchAck();
+ bAck1.setConsumerGroup(MixAll.DEFAULT_CONSUMER_GROUP);
+ bAck1.setTopic(topic);
+ bAck1.setStartOffset(MIN_OFFSET_IN_QUEUE);
+ bAck1.setBitSet(new BitSet());
+ bAck1.getBitSet().set(1);
+ bAck1.setRetry("0");
+
+ RemotingCommand request =
RemotingCommand.createRequestCommand(RequestCode.BATCH_ACK_MESSAGE, null);
+ BatchAckMessageRequestBody reqBody = new
BatchAckMessageRequestBody();
+ reqBody.setAcks(Collections.singletonList(bAck1));
+ request.setBody(reqBody.encode());
+ request.makeCustomHeaderToNet();
+ RemotingCommand response =
ackMessageProcessor.processRequest(handlerContext, request);
+
+ assertThat(response.getCode()).isEqualTo(ResponseCode.SUCCESS);
+ }
+
+ {
+ // buffer addAk fail
+ PopBufferMergeService popBufferMergeService =
mock(PopBufferMergeService.class);
+ when(popBufferMergeService.addAk(anyInt(),
any())).thenReturn(false);
+
when(popMessageProcessor.getPopBufferMergeService()).thenReturn(popBufferMergeService);
+ // store putMessage OK
+ PutMessageResult putMessageResult = new
PutMessageResult(PutMessageStatus.PUT_OK, null);
+ when(messageStore.putMessage(any())).thenReturn(putMessageResult);
+
+ BatchAck bAck1 = new BatchAck();
+ bAck1.setConsumerGroup(MixAll.DEFAULT_CONSUMER_GROUP);
+ bAck1.setTopic(topic);
+ bAck1.setStartOffset(MIN_OFFSET_IN_QUEUE);
+ bAck1.setBitSet(new BitSet());
+ bAck1.getBitSet().set(1);
+ bAck1.setRetry("0");
+
+ RemotingCommand request =
RemotingCommand.createRequestCommand(RequestCode.BATCH_ACK_MESSAGE, null);
+ BatchAckMessageRequestBody reqBody = new
BatchAckMessageRequestBody();
+ reqBody.setAcks(Arrays.asList(bAck1));
+ request.setBody(reqBody.encode());
+ request.makeCustomHeaderToNet();
+ RemotingCommand response =
ackMessageProcessor.processRequest(handlerContext, request);
+
+ assertThat(response.getCode()).isEqualTo(ResponseCode.SUCCESS);
+ }
+ }
+
}
diff --git
a/broker/src/test/java/org/apache/rocketmq/broker/processor/PopInflightMessageCounterTest.java
b/broker/src/test/java/org/apache/rocketmq/broker/processor/PopInflightMessageCounterTest.java
index 4e83ac749..dea59fc99 100644
---
a/broker/src/test/java/org/apache/rocketmq/broker/processor/PopInflightMessageCounterTest.java
+++
b/broker/src/test/java/org/apache/rocketmq/broker/processor/PopInflightMessageCounterTest.java
@@ -17,7 +17,6 @@
package org.apache.rocketmq.broker.processor;
import org.apache.rocketmq.broker.BrokerController;
-import org.apache.rocketmq.remoting.protocol.header.ExtraInfoUtil;
import org.apache.rocketmq.store.pop.PopCheckPoint;
import org.junit.Test;
@@ -42,12 +41,10 @@ public class PopInflightMessageCounterTest {
counter.incrementInFlightMessageNum(topic, group, 0, 3);
assertEquals(3, counter.getGroupPopInFlightMessageNum(topic, group,
0));
- counter.decrementInFlightMessageNum(topic, group,
ExtraInfoUtil.buildExtraInfo(0, System.currentTimeMillis(),
- 0, 0, topic, "broker", 0));
+ counter.decrementInFlightMessageNum(topic, group,
System.currentTimeMillis(), 0, 1);
assertEquals(2, counter.getGroupPopInFlightMessageNum(topic, group,
0));
- counter.decrementInFlightMessageNum(topic, group,
ExtraInfoUtil.buildExtraInfo(0, System.currentTimeMillis() - 1000,
- 0, 0, topic, "broker", 0));
+ counter.decrementInFlightMessageNum(topic, group,
System.currentTimeMillis() - 1000, 0, 1);
assertEquals(2, counter.getGroupPopInFlightMessageNum(topic, group,
0));
PopCheckPoint popCheckPoint = new PopCheckPoint();
@@ -59,12 +56,10 @@ public class PopInflightMessageCounterTest {
counter.decrementInFlightMessageNum(popCheckPoint);
assertEquals(1, counter.getGroupPopInFlightMessageNum(topic, group,
0));
- counter.decrementInFlightMessageNum(topic, group,
ExtraInfoUtil.buildExtraInfo(0, System.currentTimeMillis(),
- 0, 0, topic, "broker", 0));
+ counter.decrementInFlightMessageNum(topic, group,
System.currentTimeMillis(), 0 ,1);
assertEquals(0, counter.getGroupPopInFlightMessageNum(topic, group,
0));
- counter.decrementInFlightMessageNum(topic, group,
ExtraInfoUtil.buildExtraInfo(0, System.currentTimeMillis(),
- 0, 0, topic, "broker", 0));
+ counter.decrementInFlightMessageNum(topic, group,
System.currentTimeMillis(), 0, 1);
assertEquals(0, counter.getGroupPopInFlightMessageNum(topic, group,
0));
}
diff --git
a/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/BitSetSerializerDeserializer.java
b/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/BitSetSerializerDeserializer.java
new file mode 100644
index 000000000..8f53c0250
--- /dev/null
+++
b/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/BitSetSerializerDeserializer.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.remoting.protocol;
+
+import com.alibaba.fastjson.parser.DefaultJSONParser;
+import com.alibaba.fastjson.parser.JSONToken;
+import com.alibaba.fastjson.parser.deserializer.ObjectDeserializer;
+import com.alibaba.fastjson.serializer.JSONSerializer;
+import com.alibaba.fastjson.serializer.ObjectSerializer;
+import com.alibaba.fastjson.serializer.SerializeWriter;
+
+import java.io.IOException;
+import java.lang.reflect.Type;
+import java.util.BitSet;
+
+public class BitSetSerializerDeserializer implements ObjectSerializer,
ObjectDeserializer {
+
+ @Override
+ public void write(JSONSerializer serializer, Object object, Object
fieldName, Type fieldType, int features) throws IOException {
+ SerializeWriter out = serializer.out;
+ out.writeByteArray(((BitSet) object).toByteArray());
+ }
+
+ @SuppressWarnings("unchecked")
+ @Override
+ public <T> T deserialze(DefaultJSONParser parser, Type type, Object
fieldName) {
+ byte[] bytes = parser.parseObject(byte[].class);
+ if (bytes != null) {
+ return (T) BitSet.valueOf(bytes);
+ }
+ return null;
+ }
+
+ @Override
+ public int getFastMatchToken() {
+ return JSONToken.LITERAL_STRING;
+ }
+}
diff --git
a/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/RequestCode.java
b/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/RequestCode.java
index ec87039b4..0b1a5e010 100644
---
a/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/RequestCode.java
+++
b/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/RequestCode.java
@@ -88,6 +88,7 @@ public class RequestCode {
public static final int POP_MESSAGE = 200050;
public static final int ACK_MESSAGE = 200051;
+ public static final int BATCH_ACK_MESSAGE = 200151;
public static final int PEEK_MESSAGE = 200052;
public static final int CHANGE_MESSAGE_INVISIBLETIME = 200053;
public static final int NOTIFICATION = 200054;
diff --git
a/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/body/BatchAck.java
b/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/body/BatchAck.java
new file mode 100644
index 000000000..82dcd8567
--- /dev/null
+++
b/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/body/BatchAck.java
@@ -0,0 +1,131 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.remoting.protocol.body;
+
+import com.alibaba.fastjson.annotation.JSONField;
+import org.apache.rocketmq.remoting.protocol.BitSetSerializerDeserializer;
+
+import java.io.Serializable;
+import java.util.BitSet;
+
+public class BatchAck implements Serializable {
+ @JSONField(name = "c", alternateNames = {"consumerGroup"})
+ private String consumerGroup;
+ @JSONField(name = "t", alternateNames = {"topic"})
+ private String topic;
+ @JSONField(name = "r", alternateNames = {"retry"})
+ private String retry; // "1" if is retry topic
+ @JSONField(name = "so", alternateNames = {"startOffset"})
+ private long startOffset;
+ @JSONField(name = "q", alternateNames = {"queueId"})
+ private int queueId;
+ @JSONField(name = "rq", alternateNames = {"reviveQueueId"})
+ private int reviveQueueId;
+ @JSONField(name = "pt", alternateNames = {"popTime"})
+ private long popTime;
+ @JSONField(name = "it", alternateNames = {"invisibleTime"})
+ private long invisibleTime;
+ @JSONField(name = "b", alternateNames = {"bitSet"}, serializeUsing =
BitSetSerializerDeserializer.class, deserializeUsing =
BitSetSerializerDeserializer.class)
+ private BitSet bitSet; // ack offsets bitSet
+
+ public String getConsumerGroup() {
+ return consumerGroup;
+ }
+
+ public void setConsumerGroup(String consumerGroup) {
+ this.consumerGroup = consumerGroup;
+ }
+
+ public String getTopic() {
+ return topic;
+ }
+
+ public void setTopic(String topic) {
+ this.topic = topic;
+ }
+
+ public String getRetry() {
+ return retry;
+ }
+
+ public void setRetry(String retry) {
+ this.retry = retry;
+ }
+
+ public long getStartOffset() {
+ return startOffset;
+ }
+
+ public void setStartOffset(long startOffset) {
+ this.startOffset = startOffset;
+ }
+
+ public int getQueueId() {
+ return queueId;
+ }
+
+ public void setQueueId(int queueId) {
+ this.queueId = queueId;
+ }
+
+ public int getReviveQueueId() {
+ return reviveQueueId;
+ }
+
+ public void setReviveQueueId(int reviveQueueId) {
+ this.reviveQueueId = reviveQueueId;
+ }
+
+ public long getPopTime() {
+ return popTime;
+ }
+
+ public void setPopTime(long popTime) {
+ this.popTime = popTime;
+ }
+
+ public long getInvisibleTime() {
+ return invisibleTime;
+ }
+
+ public void setInvisibleTime(long invisibleTime) {
+ this.invisibleTime = invisibleTime;
+ }
+
+ public BitSet getBitSet() {
+ return bitSet;
+ }
+
+ public void setBitSet(BitSet bitSet) {
+ this.bitSet = bitSet;
+ }
+
+ @Override
+ public String toString() {
+ return "BatchAck{" +
+ "consumerGroup='" + consumerGroup + '\'' +
+ ", topic='" + topic + '\'' +
+ ", retry='" + retry + '\'' +
+ ", startOffset=" + startOffset +
+ ", queueId=" + queueId +
+ ", reviveQueueId=" + reviveQueueId +
+ ", popTime=" + popTime +
+ ", invisibleTime=" + invisibleTime +
+ ", bitSet=" + bitSet +
+ '}';
+ }
+}
diff --git
a/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/body/BatchAckMessageRequestBody.java
b/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/body/BatchAckMessageRequestBody.java
new file mode 100644
index 000000000..f0e1a8c3c
--- /dev/null
+++
b/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/body/BatchAckMessageRequestBody.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.remoting.protocol.body;
+
+import org.apache.rocketmq.remoting.protocol.RemotingSerializable;
+
+import java.util.List;
+
+public class BatchAckMessageRequestBody extends RemotingSerializable {
+ private String brokerName;
+ private List<BatchAck> acks;
+
+ public String getBrokerName() {
+ return brokerName;
+ }
+
+ public void setBrokerName(String brokerName) {
+ this.brokerName = brokerName;
+ }
+
+ public List<BatchAck> getAcks() {
+ return acks;
+ }
+
+ public void setAcks(List<BatchAck> acks) {
+ this.acks = acks;
+ }
+}
diff --git
a/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/header/ExtraInfoUtil.java
b/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/header/ExtraInfoUtil.java
index 7172ba959..9a5fa89ab 100644
---
a/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/header/ExtraInfoUtil.java
+++
b/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/header/ExtraInfoUtil.java
@@ -26,7 +26,7 @@ import org.apache.rocketmq.common.message.MessageConst;
public class ExtraInfoUtil {
private static final String NORMAL_TOPIC = "0";
- private static final String RETRY_TOPIC = "1";
+ public static final String RETRY_TOPIC = "1";
private static final String QUEUE_OFFSET = "qo";
public static String[] split(String extraInfo) {
@@ -75,6 +75,17 @@ public class ExtraInfoUtil {
}
}
+ public static String getRealTopic(String topic, String cid, boolean
isRetry) {
+ return isRetry ? KeyBuilder.buildPopRetryTopic(topic, cid) : topic;
+ }
+
+ public static String getRetry(String[] extraInfoStrs) {
+ if (extraInfoStrs == null || extraInfoStrs.length < 5) {
+ throw new IllegalArgumentException("getRetry fail, extraInfoStrs
length " + (extraInfoStrs == null ? 0 : extraInfoStrs.length));
+ }
+ return extraInfoStrs[4];
+ }
+
public static String getBrokerName(String[] extraInfoStrs) {
if (extraInfoStrs == null || extraInfoStrs.length < 6) {
throw new IllegalArgumentException("getBrokerName fail,
extraInfoStrs length " + (extraInfoStrs == null ? 0 : extraInfoStrs.length));
diff --git
a/remoting/src/test/java/org/apache/rocketmq/remoting/protocol/body/BatchAckTest.java
b/remoting/src/test/java/org/apache/rocketmq/remoting/protocol/body/BatchAckTest.java
new file mode 100644
index 000000000..427a132d6
--- /dev/null
+++
b/remoting/src/test/java/org/apache/rocketmq/remoting/protocol/body/BatchAckTest.java
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.remoting.protocol.body;
+
+import com.alibaba.fastjson.JSON;
+import org.apache.rocketmq.common.MixAll;
+import org.junit.Test;
+
+import java.util.Arrays;
+import java.util.BitSet;
+import java.util.List;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+public class BatchAckTest {
+ private static String topic = "myTopic";
+ private static String cid = MixAll.DEFAULT_CONSUMER_GROUP;
+ private static long startOffset = 100;
+ private static int qId = 1;
+ private static int rqId = 2;
+ private static long popTime = System.currentTimeMillis();
+ private static long invisibleTime = 5000;
+
+ @Test
+ public void testBatchAckSerializerDeserializer() {
+ List<Long> ackOffsetList = Arrays.asList(startOffset + 1, startOffset
+ 3, startOffset + 5);
+ BatchAck batchAck = new BatchAck();
+ batchAck.setConsumerGroup(cid);
+ batchAck.setTopic(topic);
+ batchAck.setRetry("0");
+ batchAck.setStartOffset(startOffset);
+ batchAck.setQueueId(qId);
+ batchAck.setReviveQueueId(rqId);
+ batchAck.setPopTime(popTime);
+ batchAck.setInvisibleTime(invisibleTime);
+ batchAck.setBitSet(new BitSet());
+ for (Long offset : ackOffsetList) {
+ batchAck.getBitSet().set((int) (offset - startOffset));
+ }
+ String jsonStr = JSON.toJSONString(batchAck);
+
+ BatchAck bAck = JSON.parseObject(jsonStr, BatchAck.class);
+ assertThat(bAck.getConsumerGroup()).isEqualTo(cid);
+ assertThat(bAck.getTopic()).isEqualTo(topic);
+ assertThat(bAck.getStartOffset()).isEqualTo(startOffset);
+ assertThat(bAck.getQueueId()).isEqualTo(qId);
+ assertThat(bAck.getReviveQueueId()).isEqualTo(rqId);
+ assertThat(bAck.getPopTime()).isEqualTo(popTime);
+ assertThat(bAck.getInvisibleTime()).isEqualTo(invisibleTime);
+ for (int i = 0; i < bAck.getBitSet().length(); i++) {
+ long ackOffset = startOffset + i;
+ if (ackOffsetList.contains(ackOffset)) {
+ assertThat(bAck.getBitSet().get(i)).isTrue();
+ } else {
+ assertThat(bAck.getBitSet().get(i)).isFalse();
+ }
+ }
+ }
+
+ @Test
+ public void testWithBatchAckMessageRequestBody() {
+ List<Long> ackOffsetList = Arrays.asList(startOffset + 1, startOffset
+ 3, startOffset + 5);
+ BatchAck batchAck = new BatchAck();
+ batchAck.setConsumerGroup(cid);
+ batchAck.setTopic(topic);
+ batchAck.setRetry("0");
+ batchAck.setStartOffset(startOffset);
+ batchAck.setQueueId(qId);
+ batchAck.setReviveQueueId(rqId);
+ batchAck.setPopTime(popTime);
+ batchAck.setInvisibleTime(invisibleTime);
+ batchAck.setBitSet(new BitSet());
+ for (Long offset : ackOffsetList) {
+ batchAck.getBitSet().set((int) (offset - startOffset));
+ }
+
+ BatchAckMessageRequestBody batchAckMessageRequestBody = new
BatchAckMessageRequestBody();
+ batchAckMessageRequestBody.setAcks(Arrays.asList(batchAck));
+ byte[] bytes = batchAckMessageRequestBody.encode();
+ BatchAckMessageRequestBody reqBody =
BatchAckMessageRequestBody.decode(bytes, BatchAckMessageRequestBody.class);
+ BatchAck bAck = reqBody.getAcks().get(0);
+ assertThat(bAck.getConsumerGroup()).isEqualTo(cid);
+ assertThat(bAck.getTopic()).isEqualTo(topic);
+ assertThat(bAck.getStartOffset()).isEqualTo(startOffset);
+ assertThat(bAck.getQueueId()).isEqualTo(qId);
+ assertThat(bAck.getReviveQueueId()).isEqualTo(rqId);
+ assertThat(bAck.getPopTime()).isEqualTo(popTime);
+ assertThat(bAck.getInvisibleTime()).isEqualTo(invisibleTime);
+ for (int i = 0; i < bAck.getBitSet().length(); i++) {
+ long ackOffset = startOffset + i;
+ if (ackOffsetList.contains(ackOffset)) {
+ assertThat(bAck.getBitSet().get(i)).isTrue();
+ } else {
+ assertThat(bAck.getBitSet().get(i)).isFalse();
+ }
+ }
+ }
+}