This is an automated email from the ASF dual-hosted git repository.

zhouxzhan pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git


The following commit(s) were added to refs/heads/develop by this push:
     new 6238caaac [ISSUE #6841] new feature: pop batch ack implementation 
broker-side (#6842)
6238caaac is described below

commit 6238caaac92fb1870f5eb234ddce86f3be045c79
Author: Quan <[email protected]>
AuthorDate: Mon Jun 19 11:06:50 2023 +0800

    [ISSUE #6841] new feature: pop batch ack implementation broker-side (#6842)
---
 .../apache/rocketmq/broker/BrokerController.java   |   3 +
 .../broker/processor/AckMessageProcessor.java      | 288 +++++++++++++--------
 .../broker/processor/PopBufferMergeService.java    |  21 +-
 .../processor/PopInflightMessageCounter.java       |  22 +-
 .../broker/processor/AckMessageProcessorTest.java  | 252 +++++++++++++++++-
 .../processor/PopInflightMessageCounterTest.java   |  13 +-
 .../protocol/BitSetSerializerDeserializer.java     |  52 ++++
 .../rocketmq/remoting/protocol/RequestCode.java    |   1 +
 .../rocketmq/remoting/protocol/body/BatchAck.java  | 131 ++++++++++
 .../protocol/body/BatchAckMessageRequestBody.java  |  43 +++
 .../remoting/protocol/header/ExtraInfoUtil.java    |  13 +-
 .../remoting/protocol/body/BatchAckTest.java       | 112 ++++++++
 12 files changed, 809 insertions(+), 142 deletions(-)

diff --git 
a/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java 
b/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java
index 7be1f20d9..03e9b3241 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java
@@ -1037,6 +1037,9 @@ public class BrokerController {
          */
         this.remotingServer.registerProcessor(RequestCode.ACK_MESSAGE, 
this.ackMessageProcessor, this.ackMessageExecutor);
         this.fastRemotingServer.registerProcessor(RequestCode.ACK_MESSAGE, 
this.ackMessageProcessor, this.ackMessageExecutor);
+
+        this.remotingServer.registerProcessor(RequestCode.BATCH_ACK_MESSAGE, 
this.ackMessageProcessor, this.ackMessageExecutor);
+        
this.fastRemotingServer.registerProcessor(RequestCode.BATCH_ACK_MESSAGE, 
this.ackMessageProcessor, this.ackMessageExecutor);
         /**
          * ChangeInvisibleTimeProcessor
          */
diff --git 
a/broker/src/main/java/org/apache/rocketmq/broker/processor/AckMessageProcessor.java
 
b/broker/src/main/java/org/apache/rocketmq/broker/processor/AckMessageProcessor.java
index fa1c0793e..2140aa881 100644
--- 
a/broker/src/main/java/org/apache/rocketmq/broker/processor/AckMessageProcessor.java
+++ 
b/broker/src/main/java/org/apache/rocketmq/broker/processor/AckMessageProcessor.java
@@ -36,18 +36,22 @@ import org.apache.rocketmq.remoting.common.RemotingHelper;
 import org.apache.rocketmq.remoting.exception.RemotingCommandException;
 import org.apache.rocketmq.remoting.netty.NettyRequestProcessor;
 import org.apache.rocketmq.remoting.protocol.RemotingCommand;
+import org.apache.rocketmq.remoting.protocol.RequestCode;
 import org.apache.rocketmq.remoting.protocol.ResponseCode;
+import org.apache.rocketmq.remoting.protocol.body.BatchAck;
+import org.apache.rocketmq.remoting.protocol.body.BatchAckMessageRequestBody;
 import org.apache.rocketmq.remoting.protocol.header.AckMessageRequestHeader;
 import org.apache.rocketmq.remoting.protocol.header.ExtraInfoUtil;
 import org.apache.rocketmq.store.PutMessageResult;
 import org.apache.rocketmq.store.PutMessageStatus;
 import org.apache.rocketmq.store.pop.AckMsg;
+import org.apache.rocketmq.store.pop.BatchAckMsg;
 
 public class AckMessageProcessor implements NettyRequestProcessor {
     private static final Logger POP_LOGGER = 
LoggerFactory.getLogger(LoggerName.ROCKETMQ_POP_LOGGER_NAME);
     private final BrokerController brokerController;
-    private String reviveTopic;
-    private PopReviveService[] popReviveServices;
+    private final String reviveTopic;
+    private final PopReviveService[] popReviveServices;
 
     public AckMessageProcessor(final BrokerController brokerController) {
         this.brokerController = brokerController;
@@ -93,7 +97,7 @@ public class AckMessageProcessor implements 
NettyRequestProcessor {
 
     @Override
     public RemotingCommand processRequest(final ChannelHandlerContext ctx,
-        RemotingCommand request) throws RemotingCommandException {
+                                          RemotingCommand request) throws 
RemotingCommandException {
         return this.processRequest(ctx.channel(), request, true);
     }
 
@@ -103,135 +107,209 @@ public class AckMessageProcessor implements 
NettyRequestProcessor {
     }
 
     private RemotingCommand processRequest(final Channel channel, 
RemotingCommand request,
-        boolean brokerAllowSuspend) throws RemotingCommandException {
-        final AckMessageRequestHeader requestHeader = 
(AckMessageRequestHeader) 
request.decodeCommandCustomHeader(AckMessageRequestHeader.class);
-        MessageExtBrokerInner msgInner = new MessageExtBrokerInner();
-        AckMsg ackMsg = new AckMsg();
-        RemotingCommand response = 
RemotingCommand.createResponseCommand(ResponseCode.SUCCESS, null);
+                                           boolean brokerAllowSuspend) throws 
RemotingCommandException {
+        AckMessageRequestHeader requestHeader;
+        BatchAckMessageRequestBody reqBody = null;
+        final RemotingCommand response = 
RemotingCommand.createResponseCommand(ResponseCode.SUCCESS, null);
         response.setOpaque(request.getOpaque());
-        TopicConfig topicConfig = 
this.brokerController.getTopicConfigManager().selectTopicConfig(requestHeader.getTopic());
-        if (null == topicConfig) {
-            POP_LOGGER.error("The topic {} not exist, consumer: {} ", 
requestHeader.getTopic(), RemotingHelper.parseChannelRemoteAddr(channel));
-            response.setCode(ResponseCode.TOPIC_NOT_EXIST);
-            response.setRemark(String.format("topic[%s] not exist, apply first 
please! %s", requestHeader.getTopic(), 
FAQUrl.suggestTodo(FAQUrl.APPLY_TOPIC_URL)));
-            return response;
-        }
+        if (request.getCode() == RequestCode.ACK_MESSAGE) {
+            requestHeader = (AckMessageRequestHeader) 
request.decodeCommandCustomHeader(AckMessageRequestHeader.class);
 
-        if (requestHeader.getQueueId() >= topicConfig.getReadQueueNums() || 
requestHeader.getQueueId() < 0) {
-            String errorInfo = String.format("queueId[%d] is illegal, 
topic:[%s] topicConfig.readQueueNums:[%d] consumer:[%s]",
-                requestHeader.getQueueId(), requestHeader.getTopic(), 
topicConfig.getReadQueueNums(), channel.remoteAddress());
-            POP_LOGGER.warn(errorInfo);
-            response.setCode(ResponseCode.MESSAGE_ILLEGAL);
-            response.setRemark(errorInfo);
-            return response;
-        }
-        long minOffset = 
this.brokerController.getMessageStore().getMinOffsetInQueue(requestHeader.getTopic(),
 requestHeader.getQueueId());
-        long maxOffset = 
this.brokerController.getMessageStore().getMaxOffsetInQueue(requestHeader.getTopic(),
 requestHeader.getQueueId());
-        if (requestHeader.getOffset() < minOffset || requestHeader.getOffset() 
> maxOffset) {
-            String errorInfo = String.format("offset is illegal, key:%s@%d, 
commit:%d, store:%d~%d",
-                requestHeader.getTopic(), requestHeader.getQueueId(), 
requestHeader.getOffset(), minOffset, maxOffset);
-            POP_LOGGER.warn(errorInfo);
-            response.setCode(ResponseCode.NO_MESSAGE);
-            response.setRemark(errorInfo);
-            return response;
-        }
-        String[] extraInfo = ExtraInfoUtil.split(requestHeader.getExtraInfo());
-
-        ackMsg.setAckOffset(requestHeader.getOffset());
-        ackMsg.setStartOffset(ExtraInfoUtil.getCkQueueOffset(extraInfo));
-        ackMsg.setConsumerGroup(requestHeader.getConsumerGroup());
-        ackMsg.setTopic(requestHeader.getTopic());
-        ackMsg.setQueueId(requestHeader.getQueueId());
-        ackMsg.setPopTime(ExtraInfoUtil.getPopTime(extraInfo));
-        ackMsg.setBrokerName(ExtraInfoUtil.getBrokerName(extraInfo));
-
-        int rqId = ExtraInfoUtil.getReviveQid(extraInfo);
-        long invisibleTime = ExtraInfoUtil.getInvisibleTime(extraInfo);
-
-        this.brokerController.getBrokerStatsManager().incBrokerAckNums(1);
-        
this.brokerController.getBrokerStatsManager().incGroupAckNums(requestHeader.getConsumerGroup(),
 requestHeader.getTopic(), 1);
-
-        if (rqId == KeyBuilder.POP_ORDER_REVIVE_QUEUE) {
-            // order
-            String lockKey = requestHeader.getTopic() + PopAckConstants.SPLIT
-                + requestHeader.getConsumerGroup() + PopAckConstants.SPLIT + 
requestHeader.getQueueId();
-            long oldOffset = 
this.brokerController.getConsumerOffsetManager().queryOffset(requestHeader.getConsumerGroup(),
-                requestHeader.getTopic(), requestHeader.getQueueId());
-            if (requestHeader.getOffset() < oldOffset) {
+            TopicConfig topicConfig = 
this.brokerController.getTopicConfigManager().selectTopicConfig(requestHeader.getTopic());
+            if (null == topicConfig) {
+                POP_LOGGER.error("The topic {} not exist, consumer: {} ", 
requestHeader.getTopic(), RemotingHelper.parseChannelRemoteAddr(channel));
+                response.setCode(ResponseCode.TOPIC_NOT_EXIST);
+                response.setRemark(String.format("topic[%s] not exist, apply 
first please! %s", requestHeader.getTopic(), 
FAQUrl.suggestTodo(FAQUrl.APPLY_TOPIC_URL)));
                 return response;
             }
-            while 
(!this.brokerController.getPopMessageProcessor().getQueueLockManager().tryLock(lockKey))
 {
+
+            if (requestHeader.getQueueId() >= topicConfig.getReadQueueNums() 
|| requestHeader.getQueueId() < 0) {
+                String errorInfo = String.format("queueId[%d] is illegal, 
topic:[%s] topicConfig.readQueueNums:[%d] consumer:[%s]",
+                        requestHeader.getQueueId(), requestHeader.getTopic(), 
topicConfig.getReadQueueNums(), channel.remoteAddress());
+                POP_LOGGER.warn(errorInfo);
+                response.setCode(ResponseCode.MESSAGE_ILLEGAL);
+                response.setRemark(errorInfo);
+                return response;
             }
-            try {
-                oldOffset = 
this.brokerController.getConsumerOffsetManager().queryOffset(requestHeader.getConsumerGroup(),
-                    requestHeader.getTopic(), requestHeader.getQueueId());
-                if (requestHeader.getOffset() < oldOffset) {
-                    return response;
+
+            long minOffset = 
this.brokerController.getMessageStore().getMinOffsetInQueue(requestHeader.getTopic(),
 requestHeader.getQueueId());
+            long maxOffset = 
this.brokerController.getMessageStore().getMaxOffsetInQueue(requestHeader.getTopic(),
 requestHeader.getQueueId());
+            if (requestHeader.getOffset() < minOffset || 
requestHeader.getOffset() > maxOffset) {
+                String errorInfo = String.format("offset is illegal, 
key:%s@%d, commit:%d, store:%d~%d",
+                        requestHeader.getTopic(), requestHeader.getQueueId(), 
requestHeader.getOffset(), minOffset, maxOffset);
+                POP_LOGGER.warn(errorInfo);
+                response.setCode(ResponseCode.NO_MESSAGE);
+                response.setRemark(errorInfo);
+                return response;
+            }
+
+            appendAck(requestHeader, null, response, channel, null);
+        } else if (request.getCode() == RequestCode.BATCH_ACK_MESSAGE) {
+            if (request.getBody() != null) {
+                reqBody = BatchAckMessageRequestBody.decode(request.getBody(), 
BatchAckMessageRequestBody.class);
+            }
+            if (reqBody == null || reqBody.getAcks() == null || 
reqBody.getAcks().isEmpty()) {
+                response.setCode(ResponseCode.NO_MESSAGE);
+                return response;
+            }
+            for (BatchAck bAck : reqBody.getAcks()) {
+                appendAck(null, bAck, response, channel, 
reqBody.getBrokerName());
+            }
+        } else {
+            POP_LOGGER.error("AckMessageProcessor failed to process 
RequestCode: {}, consumer: {} ", request.getCode(), 
RemotingHelper.parseChannelRemoteAddr(channel));
+            response.setCode(ResponseCode.MESSAGE_ILLEGAL);
+            response.setRemark(String.format("AckMessageProcessor failed to 
process RequestCode: %d", request.getCode()));
+            return response;
+        }
+        return response;
+    }
+
+    private void appendAck(final AckMessageRequestHeader requestHeader, final 
BatchAck batchAck, final RemotingCommand response, final Channel channel, 
String brokerName) {
+        String[] extraInfo;
+        String consumeGroup, topic;
+        int qId, rqId;
+        long startOffset, ackOffset;
+        long popTime, invisibleTime;
+        AckMsg ackMsg;
+        int ackCount = 0;
+        if (batchAck == null) {
+            // single ack
+            extraInfo = ExtraInfoUtil.split(requestHeader.getExtraInfo());
+            brokerName = ExtraInfoUtil.getBrokerName(extraInfo);
+            consumeGroup = requestHeader.getConsumerGroup();
+            topic = requestHeader.getTopic();
+            qId = requestHeader.getQueueId();
+            rqId = ExtraInfoUtil.getReviveQid(extraInfo);
+            startOffset = ExtraInfoUtil.getCkQueueOffset(extraInfo);
+            ackOffset = requestHeader.getOffset();
+            popTime = ExtraInfoUtil.getPopTime(extraInfo);
+            invisibleTime = ExtraInfoUtil.getInvisibleTime(extraInfo);
+
+            if (rqId == KeyBuilder.POP_ORDER_REVIVE_QUEUE) {
+                // order
+                String lockKey = topic + PopAckConstants.SPLIT + consumeGroup 
+ PopAckConstants.SPLIT + qId;
+                long oldOffset = 
this.brokerController.getConsumerOffsetManager().queryOffset(consumeGroup, 
topic, qId);
+                if (ackOffset < oldOffset) {
+                    return;
+                }
+                while 
(!this.brokerController.getPopMessageProcessor().getQueueLockManager().tryLock(lockKey))
 {
                 }
-                long nextOffset = 
brokerController.getConsumerOrderInfoManager().commitAndNext(
-                    requestHeader.getTopic(), requestHeader.getConsumerGroup(),
-                    requestHeader.getQueueId(), requestHeader.getOffset(),
-                    ExtraInfoUtil.getPopTime(extraInfo));
-                if (nextOffset > -1) {
-                    if 
(!this.brokerController.getConsumerOffsetManager().hasOffsetReset(
-                        requestHeader.getTopic(), 
requestHeader.getConsumerGroup(), requestHeader.getQueueId())) {
-                        
this.brokerController.getConsumerOffsetManager().commitOffset(channel.remoteAddress().toString(),
-                            requestHeader.getConsumerGroup(), 
requestHeader.getTopic(), requestHeader.getQueueId(), nextOffset);
+                try {
+                    oldOffset = 
this.brokerController.getConsumerOffsetManager().queryOffset(consumeGroup, 
topic, qId);
+                    if (ackOffset < oldOffset) {
+                        return;
                     }
-                    if 
(!this.brokerController.getConsumerOrderInfoManager().checkBlock(null, 
requestHeader.getTopic(),
-                        requestHeader.getConsumerGroup(), 
requestHeader.getQueueId(), invisibleTime)) {
-                        
this.brokerController.getPopMessageProcessor().notifyMessageArriving(
-                            requestHeader.getTopic(), 
requestHeader.getConsumerGroup(), requestHeader.getQueueId());
+                    long nextOffset = 
brokerController.getConsumerOrderInfoManager().commitAndNext(
+                            topic, consumeGroup,
+                            qId, ackOffset,
+                            popTime);
+                    if (nextOffset > -1) {
+                        if 
(!this.brokerController.getConsumerOffsetManager().hasOffsetReset(
+                                topic, consumeGroup, qId)) {
+                            
this.brokerController.getConsumerOffsetManager().commitOffset(channel.remoteAddress().toString(),
+                                    consumeGroup, topic, qId, nextOffset);
+                        }
+                        if 
(!this.brokerController.getConsumerOrderInfoManager().checkBlock(null, topic,
+                                consumeGroup, qId, invisibleTime)) {
+                            
this.brokerController.getPopMessageProcessor().notifyMessageArriving(
+                                    topic, consumeGroup, qId);
+                        }
+                    } else if (nextOffset == -1) {
+                        String errorInfo = String.format("offset is illegal, 
key:%s, old:%d, commit:%d, next:%d, %s",
+                                lockKey, oldOffset, ackOffset, nextOffset, 
channel.remoteAddress());
+                        POP_LOGGER.warn(errorInfo);
+                        response.setCode(ResponseCode.MESSAGE_ILLEGAL);
+                        response.setRemark(errorInfo);
+                        return;
                     }
-                } else if (nextOffset == -1) {
-                    String errorInfo = String.format("offset is illegal, 
key:%s, old:%d, commit:%d, next:%d, %s",
-                        lockKey, oldOffset, requestHeader.getOffset(), 
nextOffset, channel.remoteAddress());
-                    POP_LOGGER.warn(errorInfo);
-                    response.setCode(ResponseCode.MESSAGE_ILLEGAL);
-                    response.setRemark(errorInfo);
-                    return response;
+                } finally {
+                    
this.brokerController.getPopMessageProcessor().getQueueLockManager().unLock(lockKey);
                 }
-            } finally {
-                
this.brokerController.getPopMessageProcessor().getQueueLockManager().unLock(lockKey);
+                
brokerController.getPopInflightMessageCounter().decrementInFlightMessageNum(topic,
 consumeGroup, popTime, qId, ackCount);
+                return;
             }
-            decInFlightMessageNum(requestHeader);
-            return response;
+
+            ackMsg = new AckMsg();
+            ackCount = 1;
+        } else {
+            // batch ack
+            consumeGroup = batchAck.getConsumerGroup();
+            topic = ExtraInfoUtil.getRealTopic(batchAck.getTopic(), 
batchAck.getConsumerGroup(), 
ExtraInfoUtil.RETRY_TOPIC.equals(batchAck.getRetry()));
+            qId = batchAck.getQueueId();
+            rqId = batchAck.getReviveQueueId();
+            startOffset = batchAck.getStartOffset();
+            ackOffset = -1;
+            popTime = batchAck.getPopTime();
+            invisibleTime = batchAck.getInvisibleTime();
+
+            long minOffset = 
this.brokerController.getMessageStore().getMinOffsetInQueue(topic, qId);
+            long maxOffset = 
this.brokerController.getMessageStore().getMaxOffsetInQueue(topic, qId);
+            if (minOffset == -1 || maxOffset == -1) {
+                POP_LOGGER.error("Illegal topic or queue found when batch ack 
{}", batchAck);
+                return;
+            }
+
+            BatchAckMsg batchAckMsg = new BatchAckMsg();
+            for (int i = 0; batchAck.getBitSet() != null && i < 
batchAck.getBitSet().length(); i++) {
+                if (!batchAck.getBitSet().get(i)) {
+                    continue;
+                }
+                long offset = startOffset + i;
+                if (offset < minOffset || offset > maxOffset) {
+                    continue;
+                }
+                batchAckMsg.getAckOffsetList().add(offset);
+            }
+            if (batchAckMsg.getAckOffsetList().isEmpty()) {
+                return;
+            }
+
+            ackMsg = batchAckMsg;
+            ackCount = batchAckMsg.getAckOffsetList().size();
         }
 
+        
this.brokerController.getBrokerStatsManager().incBrokerAckNums(ackCount);
+        
this.brokerController.getBrokerStatsManager().incGroupAckNums(consumeGroup, 
topic, ackCount);
+
+        ackMsg.setConsumerGroup(consumeGroup);
+        ackMsg.setTopic(topic);
+        ackMsg.setQueueId(qId);
+        ackMsg.setStartOffset(startOffset);
+        ackMsg.setAckOffset(ackOffset);
+        ackMsg.setPopTime(popTime);
+        ackMsg.setBrokerName(brokerName);
+
         if 
(this.brokerController.getPopMessageProcessor().getPopBufferMergeService().addAk(rqId,
 ackMsg)) {
-            decInFlightMessageNum(requestHeader);
-            return response;
+            
brokerController.getPopInflightMessageCounter().decrementInFlightMessageNum(topic,
 consumeGroup, popTime, qId, ackCount);
+            return;
         }
 
+        MessageExtBrokerInner msgInner = new MessageExtBrokerInner();
         msgInner.setTopic(reviveTopic);
         
msgInner.setBody(JSON.toJSONString(ackMsg).getBytes(DataConverter.charset));
-        //msgInner.setQueueId(Integer.valueOf(extraInfo[3]));
         msgInner.setQueueId(rqId);
-        msgInner.setTags(PopAckConstants.ACK_TAG);
+        if (ackMsg instanceof BatchAckMsg) {
+            msgInner.setTags(PopAckConstants.BATCH_ACK_TAG);
+            
msgInner.getProperties().put(MessageConst.PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX,
 PopMessageProcessor.genBatchAckUniqueId((BatchAckMsg) ackMsg));
+        } else {
+            msgInner.setTags(PopAckConstants.ACK_TAG);
+            
msgInner.getProperties().put(MessageConst.PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX,
 PopMessageProcessor.genAckUniqueId(ackMsg));
+        }
         msgInner.setBornTimestamp(System.currentTimeMillis());
         msgInner.setBornHost(this.brokerController.getStoreHost());
         msgInner.setStoreHost(this.brokerController.getStoreHost());
-        msgInner.setDeliverTimeMs(ExtraInfoUtil.getPopTime(extraInfo) + 
invisibleTime);
+        msgInner.setDeliverTimeMs(popTime + invisibleTime);
         
msgInner.getProperties().put(MessageConst.PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX,
 PopMessageProcessor.genAckUniqueId(ackMsg));
         
msgInner.setPropertiesString(MessageDecoder.messageProperties2String(msgInner.getProperties()));
         PutMessageResult putMessageResult = 
this.brokerController.getEscapeBridge().putMessageToSpecificQueue(msgInner);
         if (putMessageResult.getPutMessageStatus() != PutMessageStatus.PUT_OK
-            && putMessageResult.getPutMessageStatus() != 
PutMessageStatus.FLUSH_DISK_TIMEOUT
-            && putMessageResult.getPutMessageStatus() != 
PutMessageStatus.FLUSH_SLAVE_TIMEOUT
-            && putMessageResult.getPutMessageStatus() != 
PutMessageStatus.SLAVE_NOT_AVAILABLE) {
+                && putMessageResult.getPutMessageStatus() != 
PutMessageStatus.FLUSH_DISK_TIMEOUT
+                && putMessageResult.getPutMessageStatus() != 
PutMessageStatus.FLUSH_SLAVE_TIMEOUT
+                && putMessageResult.getPutMessageStatus() != 
PutMessageStatus.SLAVE_NOT_AVAILABLE) {
             POP_LOGGER.error("put ack msg error:" + putMessageResult);
         }
+        System.out.printf("put ack to store %s", ackMsg);
         PopMetricsManager.incPopReviveAckPutCount(ackMsg, 
putMessageResult.getPutMessageStatus());
-        decInFlightMessageNum(requestHeader);
-        return response;
+        
brokerController.getPopInflightMessageCounter().decrementInFlightMessageNum(topic,
 consumeGroup, popTime, qId, ackCount);
     }
-
-    private void decInFlightMessageNum(AckMessageRequestHeader requestHeader) {
-        
this.brokerController.getPopInflightMessageCounter().decrementInFlightMessageNum(
-            requestHeader.getTopic(),
-            requestHeader.getConsumerGroup(),
-            requestHeader.getExtraInfo()
-        );
-    }
-
 }
diff --git 
a/broker/src/main/java/org/apache/rocketmq/broker/processor/PopBufferMergeService.java
 
b/broker/src/main/java/org/apache/rocketmq/broker/processor/PopBufferMergeService.java
index c5889f556..d7bc7c694 100644
--- 
a/broker/src/main/java/org/apache/rocketmq/broker/processor/PopBufferMergeService.java
+++ 
b/broker/src/main/java/org/apache/rocketmq/broker/processor/PopBufferMergeService.java
@@ -538,12 +538,23 @@ public class PopBufferMergeService extends ServiceThread {
                 return false;
             }
 
-            int indexOfAck = point.indexOfAck(ackMsg.getAckOffset());
-            if (indexOfAck > -1) {
-                markBitCAS(pointWrapper.getBits(), indexOfAck);
+            if (ackMsg instanceof BatchAckMsg) {
+                for (Long ackOffset : ((BatchAckMsg) 
ackMsg).getAckOffsetList()) {
+                    int indexOfAck = point.indexOfAck(ackOffset);
+                    if (indexOfAck > -1) {
+                        markBitCAS(pointWrapper.getBits(), indexOfAck);
+                    } else {
+                        POP_LOGGER.error("[PopBuffer]Invalid index of ack, 
reviveQid={}, {}, {}", reviveQid, ackMsg, point);
+                    }
+                }
             } else {
-                POP_LOGGER.error("[PopBuffer]Invalid index of ack, 
reviveQid={}, {}, {}", reviveQid, ackMsg, point);
-                return true;
+                int indexOfAck = point.indexOfAck(ackMsg.getAckOffset());
+                if (indexOfAck > -1) {
+                    markBitCAS(pointWrapper.getBits(), indexOfAck);
+                } else {
+                    POP_LOGGER.error("[PopBuffer]Invalid index of ack, 
reviveQid={}, {}, {}", reviveQid, ackMsg, point);
+                    return true;
+                }
             }
 
             if (brokerController.getBrokerConfig().isEnablePopLog()) {
diff --git 
a/broker/src/main/java/org/apache/rocketmq/broker/processor/PopInflightMessageCounter.java
 
b/broker/src/main/java/org/apache/rocketmq/broker/processor/PopInflightMessageCounter.java
index 584cc54ba..6749af3d7 100644
--- 
a/broker/src/main/java/org/apache/rocketmq/broker/processor/PopInflightMessageCounter.java
+++ 
b/broker/src/main/java/org/apache/rocketmq/broker/processor/PopInflightMessageCounter.java
@@ -16,18 +16,18 @@
  */
 package org.apache.rocketmq.broker.processor;
 
-import java.util.Map;
-import java.util.Set;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.atomic.AtomicLong;
 import org.apache.rocketmq.broker.BrokerController;
 import org.apache.rocketmq.common.Pair;
 import org.apache.rocketmq.common.constant.LoggerName;
 import org.apache.rocketmq.logging.org.slf4j.Logger;
 import org.apache.rocketmq.logging.org.slf4j.LoggerFactory;
-import org.apache.rocketmq.remoting.protocol.header.ExtraInfoUtil;
 import org.apache.rocketmq.store.pop.PopCheckPoint;
 
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicLong;
+
 public class PopInflightMessageCounter {
     private static final Logger log = 
LoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME);
 
@@ -61,26 +61,24 @@ public class PopInflightMessageCounter {
         });
     }
 
-    public void decrementInFlightMessageNum(String topic, String group, String 
ckInfo) {
-        String[] ckInfoList = ExtraInfoUtil.split(ckInfo);
-        long popTime = ExtraInfoUtil.getPopTime(ckInfoList);
+    public void decrementInFlightMessageNum(String topic, String group, long 
popTime, int qId, int delta) {
         if (popTime < this.brokerController.getShouldStartTime()) {
             return;
         }
-        decrementInFlightMessageNum(topic, group, 
ExtraInfoUtil.getQueueId(ckInfoList));
+        decrementInFlightMessageNum(topic, group, qId, delta);
     }
 
     public void decrementInFlightMessageNum(PopCheckPoint checkPoint) {
         if (checkPoint.getPopTime() < 
this.brokerController.getShouldStartTime()) {
             return;
         }
-        decrementInFlightMessageNum(checkPoint.getTopic(), 
checkPoint.getCId(), checkPoint.getQueueId());
+        decrementInFlightMessageNum(checkPoint.getTopic(), 
checkPoint.getCId(), checkPoint.getQueueId(), 1);
     }
 
-    public void decrementInFlightMessageNum(String topic, String group, int 
queueId) {
+    private void decrementInFlightMessageNum(String topic, String group, int 
queueId, int delta) {
         topicInFlightMessageNum.computeIfPresent(buildKey(topic, group), (key, 
queueNum) -> {
             queueNum.computeIfPresent(queueId, (queueIdKey, counter) -> {
-                if (counter.decrementAndGet() <= 0) {
+                if (counter.addAndGet(-delta) <= 0) {
                     return null;
                 }
                 return counter;
diff --git 
a/broker/src/test/java/org/apache/rocketmq/broker/processor/AckMessageProcessorTest.java
 
b/broker/src/test/java/org/apache/rocketmq/broker/processor/AckMessageProcessorTest.java
index 6719df08f..c0afb46c3 100644
--- 
a/broker/src/test/java/org/apache/rocketmq/broker/processor/AckMessageProcessorTest.java
+++ 
b/broker/src/test/java/org/apache/rocketmq/broker/processor/AckMessageProcessorTest.java
@@ -18,12 +18,12 @@ package org.apache.rocketmq.broker.processor;
 
 import io.netty.channel.Channel;
 import io.netty.channel.ChannelHandlerContext;
-import java.lang.reflect.Field;
 import org.apache.rocketmq.broker.BrokerController;
 import org.apache.rocketmq.broker.client.ClientChannelInfo;
 import org.apache.rocketmq.broker.client.net.Broker2Client;
 import org.apache.rocketmq.broker.failover.EscapeBridge;
 import org.apache.rocketmq.common.BrokerConfig;
+import org.apache.rocketmq.common.MixAll;
 import org.apache.rocketmq.common.TopicConfig;
 import org.apache.rocketmq.common.message.MessageConst;
 import org.apache.rocketmq.common.message.MessageExtBrokerInner;
@@ -36,6 +36,8 @@ import org.apache.rocketmq.remoting.protocol.LanguageCode;
 import org.apache.rocketmq.remoting.protocol.RemotingCommand;
 import org.apache.rocketmq.remoting.protocol.RequestCode;
 import org.apache.rocketmq.remoting.protocol.ResponseCode;
+import org.apache.rocketmq.remoting.protocol.body.BatchAck;
+import org.apache.rocketmq.remoting.protocol.body.BatchAckMessageRequestBody;
 import org.apache.rocketmq.remoting.protocol.header.AckMessageRequestHeader;
 import org.apache.rocketmq.remoting.protocol.header.ExtraInfoUtil;
 import org.apache.rocketmq.remoting.protocol.heartbeat.ConsumerData;
@@ -53,15 +55,25 @@ import org.mockito.Mockito;
 import org.mockito.Spy;
 import org.mockito.junit.MockitoJUnitRunner;
 
+import java.lang.reflect.Field;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.BitSet;
+import java.util.Collections;
+
 import static 
org.apache.rocketmq.broker.processor.PullMessageProcessorTest.createConsumerData;
 import static org.assertj.core.api.Assertions.assertThat;
 import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyInt;
+import static org.mockito.ArgumentMatchers.anyString;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
 
 @RunWith(MockitoJUnitRunner.class)
 public class AckMessageProcessorTest {
     private AckMessageProcessor ackMessageProcessor;
+    @Mock
+    private PopMessageProcessor popMessageProcessor;
     @Spy
     private BrokerController brokerController = new BrokerController(new 
BrokerConfig(), new NettyServerConfig(), new NettyClientConfig(), new 
MessageStoreConfig());
     @Mock
@@ -77,6 +89,9 @@ public class AckMessageProcessorTest {
     @Mock
     private Broker2Client broker2Client;
 
+    private static final long MIN_OFFSET_IN_QUEUE = 100;
+    private static final long MAX_OFFSET_IN_QUEUE = 999;
+
     @Before
     public void init() throws IllegalAccessException, NoSuchFieldException {
         clientInfo = new ClientChannelInfo(channel, "127.0.0.1", 
LanguageCode.JAVA, 0);
@@ -91,19 +106,27 @@ public class AckMessageProcessorTest {
         
brokerController.getTopicConfigManager().getTopicConfigTable().put(topic, new 
TopicConfig());
         ConsumerData consumerData = createConsumerData(group, topic);
         brokerController.getConsumerManager().registerConsumer(
-            consumerData.getGroupName(),
-            clientInfo,
-            consumerData.getConsumeType(),
-            consumerData.getMessageModel(),
-            consumerData.getConsumeFromWhere(),
-            consumerData.getSubscriptionDataSet(),
-            false);
+                consumerData.getGroupName(),
+                clientInfo,
+                consumerData.getConsumeType(),
+                consumerData.getMessageModel(),
+                consumerData.getConsumeFromWhere(),
+                consumerData.getSubscriptionDataSet(),
+                false);
         ackMessageProcessor = new AckMessageProcessor(brokerController);
+
+        when(messageStore.getMinOffsetInQueue(anyString(), 
anyInt())).thenReturn(MIN_OFFSET_IN_QUEUE);
+        when(messageStore.getMaxOffsetInQueue(anyString(), 
anyInt())).thenReturn(MAX_OFFSET_IN_QUEUE);
+
+        
when(brokerController.getPopMessageProcessor()).thenReturn(popMessageProcessor);
     }
 
     @Test
     public void testProcessRequest_Success() throws RemotingCommandException, 
InterruptedException, RemotingTimeoutException, RemotingSendRequestException {
         
when(messageStore.putMessage(any(MessageExtBrokerInner.class))).thenReturn(new 
PutMessageResult(PutMessageStatus.PUT_OK, new 
AppendMessageResult(AppendMessageStatus.PUT_OK)));
+        PopBufferMergeService popBufferMergeService = 
mock(PopBufferMergeService.class);
+        when(popBufferMergeService.addAk(anyInt(), any())).thenReturn(false);
+        
when(popMessageProcessor.getPopBufferMergeService()).thenReturn(popBufferMergeService);
 
         int queueId = 0;
         long queueOffset = 0;
@@ -112,11 +135,11 @@ public class AckMessageProcessorTest {
         int reviveQid = 0;
         String brokerName = "test_broker";
         String extraInfo = ExtraInfoUtil.buildExtraInfo(queueOffset, popTime, 
invisibleTime, reviveQid,
-            topic, brokerName, queueId) + MessageConst.KEY_SEPARATOR + 
queueOffset;
+                topic, brokerName, queueId) + MessageConst.KEY_SEPARATOR + 
queueOffset;
         AckMessageRequestHeader requestHeader = new AckMessageRequestHeader();
         requestHeader.setTopic(topic);
         requestHeader.setQueueId(0);
-        requestHeader.setOffset(0L);
+        requestHeader.setOffset(MIN_OFFSET_IN_QUEUE + 1);
         requestHeader.setConsumerGroup(group);
         requestHeader.setExtraInfo(extraInfo);
 
@@ -126,4 +149,213 @@ public class AckMessageProcessorTest {
         assertThat(responseToReturn.getCode()).isEqualTo(ResponseCode.SUCCESS);
         
assertThat(responseToReturn.getOpaque()).isEqualTo(request.getOpaque());
     }
+
+    @Test
+    public void testProcessRequest_WrongRequestCode() throws Exception {
+        RemotingCommand request = 
RemotingCommand.createRequestCommand(RequestCode.SEND_MESSAGE, null);
+        RemotingCommand response = 
ackMessageProcessor.processRequest(handlerContext, request);
+        assertThat(response.getCode()).isEqualTo(ResponseCode.MESSAGE_ILLEGAL);
+        assertThat(response.getRemark()).isEqualTo("AckMessageProcessor failed 
to process RequestCode: " + RequestCode.SEND_MESSAGE);
+    }
+
+    @Test
+    public void testSingleAck_TopicCheck() throws RemotingCommandException {
+        AckMessageRequestHeader requestHeader = new AckMessageRequestHeader();
+        requestHeader.setTopic("wrongTopic");
+        requestHeader.setQueueId(0);
+        requestHeader.setOffset(0L);
+        RemotingCommand request = 
RemotingCommand.createRequestCommand(RequestCode.ACK_MESSAGE, requestHeader);
+        request.makeCustomHeaderToNet();
+        RemotingCommand response = 
ackMessageProcessor.processRequest(handlerContext, request);
+        assertThat(response.getCode()).isEqualTo(ResponseCode.TOPIC_NOT_EXIST);
+        assertThat(response.getRemark()).contains("not exist, apply first");
+    }
+
+    @Test
+    public void testSingleAck_QueueCheck() throws RemotingCommandException {
+        {
+            int qId = -1;
+            AckMessageRequestHeader requestHeader = new 
AckMessageRequestHeader();
+            requestHeader.setTopic(topic);
+            requestHeader.setQueueId(qId);
+            requestHeader.setOffset(0L);
+            RemotingCommand request = 
RemotingCommand.createRequestCommand(RequestCode.ACK_MESSAGE, requestHeader);
+            request.makeCustomHeaderToNet();
+            RemotingCommand response = 
ackMessageProcessor.processRequest(handlerContext, request);
+            
assertThat(response.getCode()).isEqualTo(ResponseCode.MESSAGE_ILLEGAL);
+            assertThat(response.getRemark()).contains("queueId[" + qId + "] is 
illegal");
+        }
+
+        {
+            int qId = 17;
+            AckMessageRequestHeader requestHeader = new 
AckMessageRequestHeader();
+            requestHeader.setTopic(topic);
+            requestHeader.setQueueId(qId);
+            requestHeader.setOffset(0L);
+            RemotingCommand request = 
RemotingCommand.createRequestCommand(RequestCode.ACK_MESSAGE, requestHeader);
+            request.makeCustomHeaderToNet();
+            RemotingCommand response = 
ackMessageProcessor.processRequest(handlerContext, request);
+            
assertThat(response.getCode()).isEqualTo(ResponseCode.MESSAGE_ILLEGAL);
+            assertThat(response.getRemark()).contains("queueId[" + qId + "] is 
illegal");
+        }
+    }
+
+    @Test
+    public void testSingleAck_OffsetCheck() throws RemotingCommandException {
+        {
+            AckMessageRequestHeader requestHeader = new 
AckMessageRequestHeader();
+            requestHeader.setTopic(topic);
+            requestHeader.setQueueId(0);
+            requestHeader.setOffset(MIN_OFFSET_IN_QUEUE - 1);
+            //requestHeader.setOffset(maxOffsetInQueue + 1);
+            RemotingCommand request = 
RemotingCommand.createRequestCommand(RequestCode.ACK_MESSAGE, requestHeader);
+            request.makeCustomHeaderToNet();
+            RemotingCommand response = 
ackMessageProcessor.processRequest(handlerContext, request);
+            assertThat(response.getCode()).isEqualTo(ResponseCode.NO_MESSAGE);
+            assertThat(response.getRemark()).contains("offset is illegal");
+        }
+
+        {
+            AckMessageRequestHeader requestHeader = new 
AckMessageRequestHeader();
+            requestHeader.setTopic(topic);
+            requestHeader.setQueueId(0);
+            //requestHeader.setOffset(minOffsetInQueue - 1);
+            requestHeader.setOffset(MAX_OFFSET_IN_QUEUE + 1);
+            RemotingCommand request = 
RemotingCommand.createRequestCommand(RequestCode.ACK_MESSAGE, requestHeader);
+            request.makeCustomHeaderToNet();
+            RemotingCommand response = 
ackMessageProcessor.processRequest(handlerContext, request);
+            assertThat(response.getCode()).isEqualTo(ResponseCode.NO_MESSAGE);
+            assertThat(response.getRemark()).contains("offset is illegal");
+        }
+    }
+
+    @Test
+    public void testBatchAck_NoMessage() throws RemotingCommandException {
+        {
+            //reqBody == null
+            RemotingCommand request = 
RemotingCommand.createRequestCommand(RequestCode.BATCH_ACK_MESSAGE, null);
+            RemotingCommand response = 
ackMessageProcessor.processRequest(handlerContext, request);
+            assertThat(response.getCode()).isEqualTo(ResponseCode.NO_MESSAGE);
+        }
+
+        {
+            //reqBody.getAcks() == null
+            RemotingCommand request = 
RemotingCommand.createRequestCommand(RequestCode.BATCH_ACK_MESSAGE, null);
+            BatchAckMessageRequestBody reqBody = new 
BatchAckMessageRequestBody();
+            request.setBody(reqBody.encode());
+            RemotingCommand response = 
ackMessageProcessor.processRequest(handlerContext, request);
+            assertThat(response.getCode()).isEqualTo(ResponseCode.NO_MESSAGE);
+        }
+
+        {
+            //reqBody.getAcks().isEmpty()
+            RemotingCommand request = 
RemotingCommand.createRequestCommand(RequestCode.BATCH_ACK_MESSAGE, null);
+            BatchAckMessageRequestBody reqBody = new 
BatchAckMessageRequestBody();
+            reqBody.setAcks(new ArrayList<>());
+            request.setBody(reqBody.encode());
+            RemotingCommand response = 
ackMessageProcessor.processRequest(handlerContext, request);
+            assertThat(response.getCode()).isEqualTo(ResponseCode.NO_MESSAGE);
+        }
+    }
+
+    @Test
+    public void testSingleAck_appendAck() throws RemotingCommandException {
+        {
+            // buffer addAk OK
+            PopBufferMergeService popBufferMergeService = 
mock(PopBufferMergeService.class);
+            when(popBufferMergeService.addAk(anyInt(), 
any())).thenReturn(true);
+            
when(popMessageProcessor.getPopBufferMergeService()).thenReturn(popBufferMergeService);
+
+            AckMessageRequestHeader requestHeader = new 
AckMessageRequestHeader();
+            long ackOffset = MIN_OFFSET_IN_QUEUE + 10;
+            requestHeader.setTopic(topic);
+            requestHeader.setQueueId(0);
+            requestHeader.setOffset(ackOffset);
+            requestHeader.setConsumerGroup(MixAll.DEFAULT_CONSUMER_GROUP);
+            requestHeader.setExtraInfo("64 1666860736757 60000 4 0 broker-a 0 
" + ackOffset);
+            RemotingCommand request = 
RemotingCommand.createRequestCommand(RequestCode.ACK_MESSAGE, requestHeader);
+            request.makeCustomHeaderToNet();
+            RemotingCommand response = 
ackMessageProcessor.processRequest(handlerContext, request);
+
+            assertThat(response.getCode()).isEqualTo(ResponseCode.SUCCESS);
+        }
+
+        {
+            // buffer addAk fail
+            PopBufferMergeService popBufferMergeService = 
mock(PopBufferMergeService.class);
+            when(popBufferMergeService.addAk(anyInt(), 
any())).thenReturn(false);
+            
when(popMessageProcessor.getPopBufferMergeService()).thenReturn(popBufferMergeService);
+            // store putMessage OK
+            PutMessageResult putMessageResult = new 
PutMessageResult(PutMessageStatus.PUT_OK, null);
+            when(messageStore.putMessage(any())).thenReturn(putMessageResult);
+
+            AckMessageRequestHeader requestHeader = new 
AckMessageRequestHeader();
+            long ackOffset = MIN_OFFSET_IN_QUEUE + 10;
+            requestHeader.setTopic(topic);
+            requestHeader.setQueueId(0);
+            requestHeader.setOffset(ackOffset);
+            requestHeader.setConsumerGroup(MixAll.DEFAULT_CONSUMER_GROUP);
+            requestHeader.setExtraInfo("64 1666860736757 60000 4 0 broker-a 0 
" + ackOffset);
+            RemotingCommand request = 
RemotingCommand.createRequestCommand(RequestCode.ACK_MESSAGE, requestHeader);
+            request.makeCustomHeaderToNet();
+            RemotingCommand response = 
ackMessageProcessor.processRequest(handlerContext, request);
+
+            assertThat(response.getCode()).isEqualTo(ResponseCode.SUCCESS);
+        }
+    }
+
+    @Test
+    public void testBatchAck_appendAck() throws RemotingCommandException {
+        {
+            // buffer addAk OK
+            PopBufferMergeService popBufferMergeService = 
mock(PopBufferMergeService.class);
+            when(popBufferMergeService.addAk(anyInt(), 
any())).thenReturn(true);
+            
when(popMessageProcessor.getPopBufferMergeService()).thenReturn(popBufferMergeService);
+
+            BatchAck bAck1 = new BatchAck();
+            bAck1.setConsumerGroup(MixAll.DEFAULT_CONSUMER_GROUP);
+            bAck1.setTopic(topic);
+            bAck1.setStartOffset(MIN_OFFSET_IN_QUEUE);
+            bAck1.setBitSet(new BitSet());
+            bAck1.getBitSet().set(1);
+            bAck1.setRetry("0");
+
+            RemotingCommand request = 
RemotingCommand.createRequestCommand(RequestCode.BATCH_ACK_MESSAGE, null);
+            BatchAckMessageRequestBody reqBody = new 
BatchAckMessageRequestBody();
+            reqBody.setAcks(Collections.singletonList(bAck1));
+            request.setBody(reqBody.encode());
+            request.makeCustomHeaderToNet();
+            RemotingCommand response = 
ackMessageProcessor.processRequest(handlerContext, request);
+
+            assertThat(response.getCode()).isEqualTo(ResponseCode.SUCCESS);
+        }
+
+        {
+            // buffer addAk fail
+            PopBufferMergeService popBufferMergeService = 
mock(PopBufferMergeService.class);
+            when(popBufferMergeService.addAk(anyInt(), 
any())).thenReturn(false);
+            
when(popMessageProcessor.getPopBufferMergeService()).thenReturn(popBufferMergeService);
+            // store putMessage OK
+            PutMessageResult putMessageResult = new 
PutMessageResult(PutMessageStatus.PUT_OK, null);
+            when(messageStore.putMessage(any())).thenReturn(putMessageResult);
+
+            BatchAck bAck1 = new BatchAck();
+            bAck1.setConsumerGroup(MixAll.DEFAULT_CONSUMER_GROUP);
+            bAck1.setTopic(topic);
+            bAck1.setStartOffset(MIN_OFFSET_IN_QUEUE);
+            bAck1.setBitSet(new BitSet());
+            bAck1.getBitSet().set(1);
+            bAck1.setRetry("0");
+
+            RemotingCommand request = 
RemotingCommand.createRequestCommand(RequestCode.BATCH_ACK_MESSAGE, null);
+            BatchAckMessageRequestBody reqBody = new 
BatchAckMessageRequestBody();
+            reqBody.setAcks(Arrays.asList(bAck1));
+            request.setBody(reqBody.encode());
+            request.makeCustomHeaderToNet();
+            RemotingCommand response = 
ackMessageProcessor.processRequest(handlerContext, request);
+
+            assertThat(response.getCode()).isEqualTo(ResponseCode.SUCCESS);
+        }
+    }
+
 }
diff --git 
a/broker/src/test/java/org/apache/rocketmq/broker/processor/PopInflightMessageCounterTest.java
 
b/broker/src/test/java/org/apache/rocketmq/broker/processor/PopInflightMessageCounterTest.java
index 4e83ac749..dea59fc99 100644
--- 
a/broker/src/test/java/org/apache/rocketmq/broker/processor/PopInflightMessageCounterTest.java
+++ 
b/broker/src/test/java/org/apache/rocketmq/broker/processor/PopInflightMessageCounterTest.java
@@ -17,7 +17,6 @@
 package org.apache.rocketmq.broker.processor;
 
 import org.apache.rocketmq.broker.BrokerController;
-import org.apache.rocketmq.remoting.protocol.header.ExtraInfoUtil;
 import org.apache.rocketmq.store.pop.PopCheckPoint;
 import org.junit.Test;
 
@@ -42,12 +41,10 @@ public class PopInflightMessageCounterTest {
         counter.incrementInFlightMessageNum(topic, group, 0, 3);
         assertEquals(3, counter.getGroupPopInFlightMessageNum(topic, group, 
0));
 
-        counter.decrementInFlightMessageNum(topic, group, 
ExtraInfoUtil.buildExtraInfo(0, System.currentTimeMillis(),
-            0, 0, topic, "broker", 0));
+        counter.decrementInFlightMessageNum(topic, group, 
System.currentTimeMillis(), 0, 1);
         assertEquals(2, counter.getGroupPopInFlightMessageNum(topic, group, 
0));
 
-        counter.decrementInFlightMessageNum(topic, group, 
ExtraInfoUtil.buildExtraInfo(0, System.currentTimeMillis() - 1000,
-            0, 0, topic, "broker", 0));
+        counter.decrementInFlightMessageNum(topic, group, 
System.currentTimeMillis() - 1000, 0, 1);
         assertEquals(2, counter.getGroupPopInFlightMessageNum(topic, group, 
0));
 
         PopCheckPoint popCheckPoint = new PopCheckPoint();
@@ -59,12 +56,10 @@ public class PopInflightMessageCounterTest {
         counter.decrementInFlightMessageNum(popCheckPoint);
         assertEquals(1, counter.getGroupPopInFlightMessageNum(topic, group, 
0));
 
-        counter.decrementInFlightMessageNum(topic, group, 
ExtraInfoUtil.buildExtraInfo(0, System.currentTimeMillis(),
-            0, 0, topic, "broker", 0));
+        counter.decrementInFlightMessageNum(topic, group, 
System.currentTimeMillis(), 0 ,1);
         assertEquals(0, counter.getGroupPopInFlightMessageNum(topic, group, 
0));
 
-        counter.decrementInFlightMessageNum(topic, group, 
ExtraInfoUtil.buildExtraInfo(0, System.currentTimeMillis(),
-            0, 0, topic, "broker", 0));
+        counter.decrementInFlightMessageNum(topic, group, 
System.currentTimeMillis(), 0, 1);
         assertEquals(0, counter.getGroupPopInFlightMessageNum(topic, group, 
0));
     }
 
diff --git 
a/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/BitSetSerializerDeserializer.java
 
b/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/BitSetSerializerDeserializer.java
new file mode 100644
index 000000000..8f53c0250
--- /dev/null
+++ 
b/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/BitSetSerializerDeserializer.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.remoting.protocol;
+
+import com.alibaba.fastjson.parser.DefaultJSONParser;
+import com.alibaba.fastjson.parser.JSONToken;
+import com.alibaba.fastjson.parser.deserializer.ObjectDeserializer;
+import com.alibaba.fastjson.serializer.JSONSerializer;
+import com.alibaba.fastjson.serializer.ObjectSerializer;
+import com.alibaba.fastjson.serializer.SerializeWriter;
+
+import java.io.IOException;
+import java.lang.reflect.Type;
+import java.util.BitSet;
+
+public class BitSetSerializerDeserializer implements ObjectSerializer, 
ObjectDeserializer {
+
+    @Override
+    public void write(JSONSerializer serializer, Object object, Object 
fieldName, Type fieldType, int features) throws IOException {
+        SerializeWriter out = serializer.out;
+        out.writeByteArray(((BitSet) object).toByteArray());
+    }
+
+    @SuppressWarnings("unchecked")
+    @Override
+    public <T> T deserialze(DefaultJSONParser parser, Type type, Object 
fieldName) {
+        byte[] bytes = parser.parseObject(byte[].class);
+        if (bytes != null) {
+            return (T) BitSet.valueOf(bytes);
+        }
+        return null;
+    }
+
+    @Override
+    public int getFastMatchToken() {
+        return JSONToken.LITERAL_STRING;
+    }
+}
diff --git 
a/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/RequestCode.java 
b/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/RequestCode.java
index ec87039b4..0b1a5e010 100644
--- 
a/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/RequestCode.java
+++ 
b/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/RequestCode.java
@@ -88,6 +88,7 @@ public class RequestCode {
 
     public static final int POP_MESSAGE = 200050;
     public static final int ACK_MESSAGE = 200051;
+    public static final int BATCH_ACK_MESSAGE = 200151;
     public static final int PEEK_MESSAGE = 200052;
     public static final int CHANGE_MESSAGE_INVISIBLETIME = 200053;
     public static final int NOTIFICATION = 200054;
diff --git 
a/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/body/BatchAck.java
 
b/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/body/BatchAck.java
new file mode 100644
index 000000000..82dcd8567
--- /dev/null
+++ 
b/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/body/BatchAck.java
@@ -0,0 +1,131 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.remoting.protocol.body;
+
+import com.alibaba.fastjson.annotation.JSONField;
+import org.apache.rocketmq.remoting.protocol.BitSetSerializerDeserializer;
+
+import java.io.Serializable;
+import java.util.BitSet;
+
+public class BatchAck implements Serializable {
+    @JSONField(name = "c", alternateNames = {"consumerGroup"})
+    private String consumerGroup;
+    @JSONField(name = "t", alternateNames = {"topic"})
+    private String topic;
+    @JSONField(name = "r", alternateNames = {"retry"})
+    private String retry; // "1" if is retry topic
+    @JSONField(name = "so", alternateNames = {"startOffset"})
+    private long startOffset;
+    @JSONField(name = "q", alternateNames = {"queueId"})
+    private int queueId;
+    @JSONField(name = "rq", alternateNames = {"reviveQueueId"})
+    private int reviveQueueId;
+    @JSONField(name = "pt", alternateNames = {"popTime"})
+    private long popTime;
+    @JSONField(name = "it", alternateNames = {"invisibleTime"})
+    private long invisibleTime;
+    @JSONField(name = "b", alternateNames = {"bitSet"}, serializeUsing = 
BitSetSerializerDeserializer.class, deserializeUsing = 
BitSetSerializerDeserializer.class)
+    private BitSet bitSet; // ack offsets bitSet
+
+    public String getConsumerGroup() {
+        return consumerGroup;
+    }
+
+    public void setConsumerGroup(String consumerGroup) {
+        this.consumerGroup = consumerGroup;
+    }
+
+    public String getTopic() {
+        return topic;
+    }
+
+    public void setTopic(String topic) {
+        this.topic = topic;
+    }
+
+    public String getRetry() {
+        return retry;
+    }
+
+    public void setRetry(String retry) {
+        this.retry = retry;
+    }
+
+    public long getStartOffset() {
+        return startOffset;
+    }
+
+    public void setStartOffset(long startOffset) {
+        this.startOffset = startOffset;
+    }
+
+    public int getQueueId() {
+        return queueId;
+    }
+
+    public void setQueueId(int queueId) {
+        this.queueId = queueId;
+    }
+
+    public int getReviveQueueId() {
+        return reviveQueueId;
+    }
+
+    public void setReviveQueueId(int reviveQueueId) {
+        this.reviveQueueId = reviveQueueId;
+    }
+
+    public long getPopTime() {
+        return popTime;
+    }
+
+    public void setPopTime(long popTime) {
+        this.popTime = popTime;
+    }
+
+    public long getInvisibleTime() {
+        return invisibleTime;
+    }
+
+    public void setInvisibleTime(long invisibleTime) {
+        this.invisibleTime = invisibleTime;
+    }
+
+    public BitSet getBitSet() {
+        return bitSet;
+    }
+
+    public void setBitSet(BitSet bitSet) {
+        this.bitSet = bitSet;
+    }
+
+    @Override
+    public String toString() {
+        return "BatchAck{" +
+                "consumerGroup='" + consumerGroup + '\'' +
+                ", topic='" + topic + '\'' +
+                ", retry='" + retry + '\'' +
+                ", startOffset=" + startOffset +
+                ", queueId=" + queueId +
+                ", reviveQueueId=" + reviveQueueId +
+                ", popTime=" + popTime +
+                ", invisibleTime=" + invisibleTime +
+                ", bitSet=" + bitSet +
+                '}';
+    }
+}
diff --git 
a/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/body/BatchAckMessageRequestBody.java
 
b/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/body/BatchAckMessageRequestBody.java
new file mode 100644
index 000000000..f0e1a8c3c
--- /dev/null
+++ 
b/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/body/BatchAckMessageRequestBody.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.remoting.protocol.body;
+
+import org.apache.rocketmq.remoting.protocol.RemotingSerializable;
+
+import java.util.List;
+
+public class BatchAckMessageRequestBody extends RemotingSerializable {
+    private String brokerName;
+    private List<BatchAck> acks;
+
+    public String getBrokerName() {
+        return brokerName;
+    }
+
+    public void setBrokerName(String brokerName) {
+        this.brokerName = brokerName;
+    }
+
+    public List<BatchAck> getAcks() {
+        return acks;
+    }
+
+    public void setAcks(List<BatchAck> acks) {
+        this.acks = acks;
+    }
+}
diff --git 
a/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/header/ExtraInfoUtil.java
 
b/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/header/ExtraInfoUtil.java
index 7172ba959..9a5fa89ab 100644
--- 
a/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/header/ExtraInfoUtil.java
+++ 
b/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/header/ExtraInfoUtil.java
@@ -26,7 +26,7 @@ import org.apache.rocketmq.common.message.MessageConst;
 
 public class ExtraInfoUtil {
     private static final String NORMAL_TOPIC = "0";
-    private static final String RETRY_TOPIC = "1";
+    public static final String RETRY_TOPIC = "1";
     private static final String QUEUE_OFFSET = "qo";
 
     public static String[] split(String extraInfo) {
@@ -75,6 +75,17 @@ public class ExtraInfoUtil {
         }
     }
 
+    public static String getRealTopic(String topic, String cid, boolean 
isRetry) {
+        return isRetry ? KeyBuilder.buildPopRetryTopic(topic, cid) : topic;
+    }
+
+    public static String getRetry(String[] extraInfoStrs) {
+        if (extraInfoStrs == null || extraInfoStrs.length < 5) {
+            throw new IllegalArgumentException("getRetry fail, extraInfoStrs 
length " + (extraInfoStrs == null ? 0 : extraInfoStrs.length));
+        }
+        return extraInfoStrs[4];
+    }
+
     public static String getBrokerName(String[] extraInfoStrs) {
         if (extraInfoStrs == null || extraInfoStrs.length < 6) {
             throw new IllegalArgumentException("getBrokerName fail, 
extraInfoStrs length " + (extraInfoStrs == null ? 0 : extraInfoStrs.length));
diff --git 
a/remoting/src/test/java/org/apache/rocketmq/remoting/protocol/body/BatchAckTest.java
 
b/remoting/src/test/java/org/apache/rocketmq/remoting/protocol/body/BatchAckTest.java
new file mode 100644
index 000000000..427a132d6
--- /dev/null
+++ 
b/remoting/src/test/java/org/apache/rocketmq/remoting/protocol/body/BatchAckTest.java
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.remoting.protocol.body;
+
+import com.alibaba.fastjson.JSON;
+import org.apache.rocketmq.common.MixAll;
+import org.junit.Test;
+
+import java.util.Arrays;
+import java.util.BitSet;
+import java.util.List;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+public class BatchAckTest {
+    private static String topic = "myTopic";
+    private static String cid = MixAll.DEFAULT_CONSUMER_GROUP;
+    private static long startOffset = 100;
+    private static int qId = 1;
+    private static int rqId = 2;
+    private static long popTime = System.currentTimeMillis();
+    private static long invisibleTime = 5000;
+
+    @Test
+    public void testBatchAckSerializerDeserializer() {
+        List<Long> ackOffsetList = Arrays.asList(startOffset + 1, startOffset 
+ 3, startOffset + 5);
+        BatchAck batchAck = new BatchAck();
+        batchAck.setConsumerGroup(cid);
+        batchAck.setTopic(topic);
+        batchAck.setRetry("0");
+        batchAck.setStartOffset(startOffset);
+        batchAck.setQueueId(qId);
+        batchAck.setReviveQueueId(rqId);
+        batchAck.setPopTime(popTime);
+        batchAck.setInvisibleTime(invisibleTime);
+        batchAck.setBitSet(new BitSet());
+        for (Long offset : ackOffsetList) {
+            batchAck.getBitSet().set((int) (offset - startOffset));
+        }
+        String jsonStr = JSON.toJSONString(batchAck);
+
+        BatchAck bAck = JSON.parseObject(jsonStr, BatchAck.class);
+        assertThat(bAck.getConsumerGroup()).isEqualTo(cid);
+        assertThat(bAck.getTopic()).isEqualTo(topic);
+        assertThat(bAck.getStartOffset()).isEqualTo(startOffset);
+        assertThat(bAck.getQueueId()).isEqualTo(qId);
+        assertThat(bAck.getReviveQueueId()).isEqualTo(rqId);
+        assertThat(bAck.getPopTime()).isEqualTo(popTime);
+        assertThat(bAck.getInvisibleTime()).isEqualTo(invisibleTime);
+        for (int i = 0; i < bAck.getBitSet().length(); i++) {
+            long ackOffset = startOffset + i;
+            if (ackOffsetList.contains(ackOffset)) {
+                assertThat(bAck.getBitSet().get(i)).isTrue();
+            } else {
+                assertThat(bAck.getBitSet().get(i)).isFalse();
+            }
+        }
+    }
+
+    @Test
+    public void testWithBatchAckMessageRequestBody() {
+        List<Long> ackOffsetList = Arrays.asList(startOffset + 1, startOffset 
+ 3, startOffset + 5);
+        BatchAck batchAck = new BatchAck();
+        batchAck.setConsumerGroup(cid);
+        batchAck.setTopic(topic);
+        batchAck.setRetry("0");
+        batchAck.setStartOffset(startOffset);
+        batchAck.setQueueId(qId);
+        batchAck.setReviveQueueId(rqId);
+        batchAck.setPopTime(popTime);
+        batchAck.setInvisibleTime(invisibleTime);
+        batchAck.setBitSet(new BitSet());
+        for (Long offset : ackOffsetList) {
+            batchAck.getBitSet().set((int) (offset - startOffset));
+        }
+
+        BatchAckMessageRequestBody batchAckMessageRequestBody = new 
BatchAckMessageRequestBody();
+        batchAckMessageRequestBody.setAcks(Arrays.asList(batchAck));
+        byte[] bytes = batchAckMessageRequestBody.encode();
+        BatchAckMessageRequestBody reqBody = 
BatchAckMessageRequestBody.decode(bytes, BatchAckMessageRequestBody.class);
+        BatchAck bAck = reqBody.getAcks().get(0);
+        assertThat(bAck.getConsumerGroup()).isEqualTo(cid);
+        assertThat(bAck.getTopic()).isEqualTo(topic);
+        assertThat(bAck.getStartOffset()).isEqualTo(startOffset);
+        assertThat(bAck.getQueueId()).isEqualTo(qId);
+        assertThat(bAck.getReviveQueueId()).isEqualTo(rqId);
+        assertThat(bAck.getPopTime()).isEqualTo(popTime);
+        assertThat(bAck.getInvisibleTime()).isEqualTo(invisibleTime);
+        for (int i = 0; i < bAck.getBitSet().length(); i++) {
+            long ackOffset = startOffset + i;
+            if (ackOffsetList.contains(ackOffset)) {
+                assertThat(bAck.getBitSet().get(i)).isTrue();
+            } else {
+                assertThat(bAck.getBitSet().get(i)).isFalse();
+            }
+        }
+    }
+}

Reply via email to