This is an automated email from the ASF dual-hosted git repository.

kaili pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git


The following commit(s) were added to refs/heads/develop by this push:
     new 72d796f2b [ISSUE #7205] support batch ack for pop orderly (#7206)
72d796f2b is described below

commit 72d796f2b20b3ec6aebca8c004d9275d7c749a95
Author: lk <[email protected]>
AuthorDate: Fri Aug 18 11:55:39 2023 +0800

    [ISSUE #7205] support batch ack for pop orderly (#7206)
---
 .../broker/processor/AckMessageProcessor.java      |  99 +++++++------
 .../rocketmq/client/impl/MQClientAPIImpl.java      |  91 ++++++++++--
 .../rocketmq/test/client/rmq/RMQPopClient.java     |  22 +++
 .../test/client/consumer/pop/BasePopNormally.java  |   6 +
 .../test/client/consumer/pop/BatchAckIT.java       | 159 +++++++++++++++++++++
 5 files changed, 322 insertions(+), 55 deletions(-)

diff --git 
a/broker/src/main/java/org/apache/rocketmq/broker/processor/AckMessageProcessor.java
 
b/broker/src/main/java/org/apache/rocketmq/broker/processor/AckMessageProcessor.java
index 687811409..244b459d6 100644
--- 
a/broker/src/main/java/org/apache/rocketmq/broker/processor/AckMessageProcessor.java
+++ 
b/broker/src/main/java/org/apache/rocketmq/broker/processor/AckMessageProcessor.java
@@ -19,6 +19,7 @@ package org.apache.rocketmq.broker.processor;
 import com.alibaba.fastjson.JSON;
 import io.netty.channel.Channel;
 import io.netty.channel.ChannelHandlerContext;
+import java.util.BitSet;
 import org.apache.rocketmq.broker.BrokerController;
 import org.apache.rocketmq.broker.metrics.PopMetricsManager;
 import org.apache.rocketmq.common.KeyBuilder;
@@ -186,46 +187,7 @@ public class AckMessageProcessor implements 
NettyRequestProcessor {
             invisibleTime = ExtraInfoUtil.getInvisibleTime(extraInfo);
 
             if (rqId == KeyBuilder.POP_ORDER_REVIVE_QUEUE) {
-                // order
-                String lockKey = topic + PopAckConstants.SPLIT + consumeGroup 
+ PopAckConstants.SPLIT + qId;
-                long oldOffset = 
this.brokerController.getConsumerOffsetManager().queryOffset(consumeGroup, 
topic, qId);
-                if (ackOffset < oldOffset) {
-                    return;
-                }
-                while 
(!this.brokerController.getPopMessageProcessor().getQueueLockManager().tryLock(lockKey))
 {
-                }
-                try {
-                    oldOffset = 
this.brokerController.getConsumerOffsetManager().queryOffset(consumeGroup, 
topic, qId);
-                    if (ackOffset < oldOffset) {
-                        return;
-                    }
-                    long nextOffset = 
brokerController.getConsumerOrderInfoManager().commitAndNext(
-                            topic, consumeGroup,
-                            qId, ackOffset,
-                            popTime);
-                    if (nextOffset > -1) {
-                        if 
(!this.brokerController.getConsumerOffsetManager().hasOffsetReset(
-                                topic, consumeGroup, qId)) {
-                            
this.brokerController.getConsumerOffsetManager().commitOffset(channel.remoteAddress().toString(),
-                                    consumeGroup, topic, qId, nextOffset);
-                        }
-                        if 
(!this.brokerController.getConsumerOrderInfoManager().checkBlock(null, topic,
-                                consumeGroup, qId, invisibleTime)) {
-                            
this.brokerController.getPopMessageProcessor().notifyMessageArriving(
-                                    topic, consumeGroup, qId);
-                        }
-                    } else if (nextOffset == -1) {
-                        String errorInfo = String.format("offset is illegal, 
key:%s, old:%d, commit:%d, next:%d, %s",
-                                lockKey, oldOffset, ackOffset, nextOffset, 
channel.remoteAddress());
-                        POP_LOGGER.warn(errorInfo);
-                        response.setCode(ResponseCode.MESSAGE_ILLEGAL);
-                        response.setRemark(errorInfo);
-                        return;
-                    }
-                } finally {
-                    
this.brokerController.getPopMessageProcessor().getQueueLockManager().unLock(lockKey);
-                }
-                
brokerController.getPopInflightMessageCounter().decrementInFlightMessageNum(topic,
 consumeGroup, popTime, qId, ackCount);
+                ackOrderly(topic, consumeGroup, qId, ackOffset, popTime, 
invisibleTime, channel, response);
                 return;
             }
 
@@ -250,17 +212,22 @@ public class AckMessageProcessor implements 
NettyRequestProcessor {
             }
 
             BatchAckMsg batchAckMsg = new BatchAckMsg();
-            for (int i = 0; batchAck.getBitSet() != null && i < 
batchAck.getBitSet().length(); i++) {
-                if (!batchAck.getBitSet().get(i)) {
-                    continue;
+            BitSet bitSet = batchAck.getBitSet();
+            for (int i = bitSet.nextSetBit(0); i >= 0; i = bitSet.nextSetBit(i 
+ 1)) {
+                if (i == Integer.MAX_VALUE) {
+                    break;
                 }
                 long offset = startOffset + i;
                 if (offset < minOffset || offset > maxOffset) {
                     continue;
                 }
-                batchAckMsg.getAckOffsetList().add(offset);
+                if (rqId == KeyBuilder.POP_ORDER_REVIVE_QUEUE) {
+                    ackOrderly(topic, consumeGroup, qId, offset, popTime, 
invisibleTime, channel, response);
+                } else {
+                    batchAckMsg.getAckOffsetList().add(offset);
+                }
             }
-            if (batchAckMsg.getAckOffsetList().isEmpty()) {
+            if (rqId == KeyBuilder.POP_ORDER_REVIVE_QUEUE || 
batchAckMsg.getAckOffsetList().isEmpty()) {
                 return;
             }
 
@@ -311,4 +278,46 @@ public class AckMessageProcessor implements 
NettyRequestProcessor {
         PopMetricsManager.incPopReviveAckPutCount(ackMsg, 
putMessageResult.getPutMessageStatus());
         
brokerController.getPopInflightMessageCounter().decrementInFlightMessageNum(topic,
 consumeGroup, popTime, qId, ackCount);
     }
+
+    protected void ackOrderly(String topic, String consumeGroup, int qId, long 
ackOffset, long popTime, long invisibleTime, Channel channel, RemotingCommand 
response) {
+        String lockKey = topic + PopAckConstants.SPLIT + consumeGroup + 
PopAckConstants.SPLIT + qId;
+        long oldOffset = 
this.brokerController.getConsumerOffsetManager().queryOffset(consumeGroup, 
topic, qId);
+        if (ackOffset < oldOffset) {
+            return;
+        }
+        while 
(!this.brokerController.getPopMessageProcessor().getQueueLockManager().tryLock(lockKey))
 {
+        }
+        try {
+            oldOffset = 
this.brokerController.getConsumerOffsetManager().queryOffset(consumeGroup, 
topic, qId);
+            if (ackOffset < oldOffset) {
+                return;
+            }
+            long nextOffset = 
brokerController.getConsumerOrderInfoManager().commitAndNext(
+                topic, consumeGroup,
+                qId, ackOffset,
+                popTime);
+            if (nextOffset > -1) {
+                if 
(!this.brokerController.getConsumerOffsetManager().hasOffsetReset(
+                    topic, consumeGroup, qId)) {
+                    
this.brokerController.getConsumerOffsetManager().commitOffset(channel.remoteAddress().toString(),
+                        consumeGroup, topic, qId, nextOffset);
+                }
+                if 
(!this.brokerController.getConsumerOrderInfoManager().checkBlock(null, topic,
+                    consumeGroup, qId, invisibleTime)) {
+                    
this.brokerController.getPopMessageProcessor().notifyMessageArriving(
+                        topic, consumeGroup, qId);
+                }
+            } else if (nextOffset == -1) {
+                String errorInfo = String.format("offset is illegal, key:%s, 
old:%d, commit:%d, next:%d, %s",
+                    lockKey, oldOffset, ackOffset, nextOffset, 
channel.remoteAddress());
+                POP_LOGGER.warn(errorInfo);
+                response.setCode(ResponseCode.MESSAGE_ILLEGAL);
+                response.setRemark(errorInfo);
+                return;
+            }
+        } finally {
+            
this.brokerController.getPopMessageProcessor().getQueueLockManager().unLock(lockKey);
+        }
+        
brokerController.getPopInflightMessageCounter().decrementInFlightMessageNum(topic,
 consumeGroup, popTime, qId, 1);
+    }
 }
diff --git 
a/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java 
b/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java
index 5101ffc8e..213c26fd6 100644
--- a/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java
+++ b/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java
@@ -21,6 +21,7 @@ import java.io.UnsupportedEncodingException;
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.BitSet;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.Iterator;
@@ -54,6 +55,7 @@ import org.apache.rocketmq.client.producer.SendCallback;
 import org.apache.rocketmq.client.producer.SendResult;
 import org.apache.rocketmq.client.producer.SendStatus;
 import org.apache.rocketmq.common.AclConfig;
+import org.apache.rocketmq.common.BoundaryType;
 import org.apache.rocketmq.common.MQVersion;
 import org.apache.rocketmq.common.MixAll;
 import org.apache.rocketmq.common.Pair;
@@ -76,7 +78,8 @@ import 
org.apache.rocketmq.common.namesrv.NameServerUpdateCallback;
 import org.apache.rocketmq.common.namesrv.TopAddressing;
 import org.apache.rocketmq.common.sysflag.PullSysFlag;
 import org.apache.rocketmq.common.topic.TopicValidator;
-import org.apache.rocketmq.common.BoundaryType;
+import org.apache.rocketmq.logging.org.slf4j.Logger;
+import org.apache.rocketmq.logging.org.slf4j.LoggerFactory;
 import org.apache.rocketmq.remoting.CommandCustomHeader;
 import org.apache.rocketmq.remoting.InvokeCallback;
 import org.apache.rocketmq.remoting.RPCHook;
@@ -101,7 +104,10 @@ import org.apache.rocketmq.remoting.protocol.RequestCode;
 import org.apache.rocketmq.remoting.protocol.ResponseCode;
 import org.apache.rocketmq.remoting.protocol.admin.ConsumeStats;
 import org.apache.rocketmq.remoting.protocol.admin.TopicStatsTable;
+import org.apache.rocketmq.remoting.protocol.body.BatchAck;
+import org.apache.rocketmq.remoting.protocol.body.BatchAckMessageRequestBody;
 import org.apache.rocketmq.remoting.protocol.body.BrokerMemberGroup;
+import org.apache.rocketmq.remoting.protocol.body.BrokerReplicasInfo;
 import org.apache.rocketmq.remoting.protocol.body.BrokerStatsData;
 import org.apache.rocketmq.remoting.protocol.body.CheckClientRequestBody;
 import org.apache.rocketmq.remoting.protocol.body.ClusterAclVersionInfo;
@@ -114,7 +120,6 @@ import 
org.apache.rocketmq.remoting.protocol.body.EpochEntryCache;
 import org.apache.rocketmq.remoting.protocol.body.GetConsumerStatusBody;
 import org.apache.rocketmq.remoting.protocol.body.GroupList;
 import org.apache.rocketmq.remoting.protocol.body.HARuntimeInfo;
-import org.apache.rocketmq.remoting.protocol.body.BrokerReplicasInfo;
 import org.apache.rocketmq.remoting.protocol.body.KVTable;
 import org.apache.rocketmq.remoting.protocol.body.LockBatchRequestBody;
 import org.apache.rocketmq.remoting.protocol.body.LockBatchResponseBody;
@@ -196,6 +201,10 @@ import 
org.apache.rocketmq.remoting.protocol.header.UpdateGlobalWhiteAddrsConfig
 import 
org.apache.rocketmq.remoting.protocol.header.UpdateGroupForbiddenRequestHeader;
 import 
org.apache.rocketmq.remoting.protocol.header.ViewBrokerStatsDataRequestHeader;
 import org.apache.rocketmq.remoting.protocol.header.ViewMessageRequestHeader;
+import 
org.apache.rocketmq.remoting.protocol.header.controller.ElectMasterRequestHeader;
+import 
org.apache.rocketmq.remoting.protocol.header.controller.ElectMasterResponseHeader;
+import 
org.apache.rocketmq.remoting.protocol.header.controller.GetMetaDataResponseHeader;
+import 
org.apache.rocketmq.remoting.protocol.header.controller.admin.CleanControllerBrokerDataRequestHeader;
 import 
org.apache.rocketmq.remoting.protocol.header.namesrv.AddWritePermOfBrokerRequestHeader;
 import 
org.apache.rocketmq.remoting.protocol.header.namesrv.AddWritePermOfBrokerResponseHeader;
 import 
org.apache.rocketmq.remoting.protocol.header.namesrv.DeleteKVConfigRequestHeader;
@@ -207,10 +216,6 @@ import 
org.apache.rocketmq.remoting.protocol.header.namesrv.GetRouteInfoRequestH
 import 
org.apache.rocketmq.remoting.protocol.header.namesrv.PutKVConfigRequestHeader;
 import 
org.apache.rocketmq.remoting.protocol.header.namesrv.WipeWritePermOfBrokerRequestHeader;
 import 
org.apache.rocketmq.remoting.protocol.header.namesrv.WipeWritePermOfBrokerResponseHeader;
-import 
org.apache.rocketmq.remoting.protocol.header.controller.admin.CleanControllerBrokerDataRequestHeader;
-import 
org.apache.rocketmq.remoting.protocol.header.controller.ElectMasterRequestHeader;
-import 
org.apache.rocketmq.remoting.protocol.header.controller.ElectMasterResponseHeader;
-import 
org.apache.rocketmq.remoting.protocol.header.controller.GetMetaDataResponseHeader;
 import org.apache.rocketmq.remoting.protocol.heartbeat.HeartbeatData;
 import org.apache.rocketmq.remoting.protocol.heartbeat.MessageModel;
 import org.apache.rocketmq.remoting.protocol.heartbeat.SubscriptionData;
@@ -221,8 +226,6 @@ import 
org.apache.rocketmq.remoting.protocol.subscription.GroupForbidden;
 import 
org.apache.rocketmq.remoting.protocol.subscription.SubscriptionGroupConfig;
 import org.apache.rocketmq.remoting.rpchook.DynamicalExtFieldRPCHook;
 import org.apache.rocketmq.remoting.rpchook.StreamTypeRPCHook;
-import org.apache.rocketmq.logging.org.slf4j.Logger;
-import org.apache.rocketmq.logging.org.slf4j.LoggerFactory;
 
 import static 
org.apache.rocketmq.remoting.protocol.RemotingSysResponseCode.SUCCESS;
 
@@ -885,9 +888,77 @@ public class MQClientAPIImpl implements 
NameServerUpdateCallback {
         final String addr,
         final long timeOut,
         final AckCallback ackCallback,
-        final AckMessageRequestHeader requestHeader //
+        final AckMessageRequestHeader requestHeader
+    ) throws RemotingException, MQBrokerException, InterruptedException {
+        ackMessageAsync(addr, timeOut, ackCallback, requestHeader, null);
+    }
+
+    public void batchAckMessageAsync(
+        final String addr,
+        final long timeOut,
+        final AckCallback ackCallback,
+        final String topic,
+        final String consumerGroup,
+        final List<String> extraInfoList
+    ) throws RemotingException, MQBrokerException, InterruptedException {
+        String brokerName = null;
+        Map<String, BatchAck> batchAckMap = new HashMap<>();
+        for (String extraInfo : extraInfoList) {
+            String[] extraInfoData = ExtraInfoUtil.split(extraInfo);
+            if (brokerName == null) {
+                brokerName = ExtraInfoUtil.getBrokerName(extraInfoData);
+            }
+            String mergeKey = ExtraInfoUtil.getRetry(extraInfoData) + "@" +
+                ExtraInfoUtil.getQueueId(extraInfoData) + "@" +
+                ExtraInfoUtil.getCkQueueOffset(extraInfoData) + "@" +
+                ExtraInfoUtil.getPopTime(extraInfoData);
+            BatchAck bAck = batchAckMap.computeIfAbsent(mergeKey, k -> {
+                BatchAck newBatchAck = new BatchAck();
+                newBatchAck.setConsumerGroup(consumerGroup);
+                newBatchAck.setTopic(topic);
+                newBatchAck.setRetry(ExtraInfoUtil.getRetry(extraInfoData));
+                
newBatchAck.setStartOffset(ExtraInfoUtil.getCkQueueOffset(extraInfoData));
+                
newBatchAck.setQueueId(ExtraInfoUtil.getQueueId(extraInfoData));
+                
newBatchAck.setReviveQueueId(ExtraInfoUtil.getReviveQid(extraInfoData));
+                
newBatchAck.setPopTime(ExtraInfoUtil.getPopTime(extraInfoData));
+                
newBatchAck.setInvisibleTime(ExtraInfoUtil.getInvisibleTime(extraInfoData));
+                newBatchAck.setBitSet(new BitSet());
+                return newBatchAck;
+            });
+            bAck.getBitSet().set((int) 
(ExtraInfoUtil.getQueueOffset(extraInfoData) - 
ExtraInfoUtil.getCkQueueOffset(extraInfoData)));
+        }
+
+        BatchAckMessageRequestBody requestBody = new 
BatchAckMessageRequestBody();
+        requestBody.setBrokerName(brokerName);
+        requestBody.setAcks(new ArrayList<>(batchAckMap.values()));
+        batchAckMessageAsync(addr, timeOut, ackCallback, requestBody);
+    }
+
+    public void batchAckMessageAsync(
+        final String addr,
+        final long timeOut,
+        final AckCallback ackCallback,
+        final BatchAckMessageRequestBody requestBody
     ) throws RemotingException, MQBrokerException, InterruptedException {
-        final RemotingCommand request = 
RemotingCommand.createRequestCommand(RequestCode.ACK_MESSAGE, requestHeader);
+        ackMessageAsync(addr, timeOut, ackCallback, null, requestBody);
+    }
+
+    protected void ackMessageAsync(
+        final String addr,
+        final long timeOut,
+        final AckCallback ackCallback,
+        final AckMessageRequestHeader requestHeader,
+        final BatchAckMessageRequestBody requestBody
+    ) throws RemotingException, MQBrokerException, InterruptedException {
+        RemotingCommand request;
+        if (requestHeader != null) {
+            request = 
RemotingCommand.createRequestCommand(RequestCode.ACK_MESSAGE, requestHeader);
+        } else {
+            request = 
RemotingCommand.createRequestCommand(RequestCode.BATCH_ACK_MESSAGE, null);
+            if (requestBody != null) {
+                request.setBody(requestBody.encode());
+            }
+        }
         this.remotingClient.invokeAsync(addr, request, timeOut, new 
BaseInvokeCallback(MQClientAPIImpl.this) {
 
             @Override
diff --git 
a/test/src/main/java/org/apache/rocketmq/test/client/rmq/RMQPopClient.java 
b/test/src/main/java/org/apache/rocketmq/test/client/rmq/RMQPopClient.java
index 496bd6da4..09c60c0b4 100644
--- a/test/src/main/java/org/apache/rocketmq/test/client/rmq/RMQPopClient.java
+++ b/test/src/main/java/org/apache/rocketmq/test/client/rmq/RMQPopClient.java
@@ -17,6 +17,7 @@
 
 package org.apache.rocketmq.test.client.rmq;
 
+import java.util.List;
 import java.util.concurrent.CompletableFuture;
 import org.apache.rocketmq.client.ClientConfig;
 import org.apache.rocketmq.client.consumer.AckCallback;
@@ -140,6 +141,27 @@ public class RMQPopClient implements MQConsumer {
         return future;
     }
 
+    public CompletableFuture<AckResult> batchAckMessageAsync(String 
brokerAddr, String topic, String consumerGroup,
+        List<String> extraInfoList) {
+        CompletableFuture<AckResult> future = new CompletableFuture<>();
+        try {
+            this.mqClientAPI.batchAckMessageAsync(brokerAddr, DEFAULT_TIMEOUT, 
new AckCallback() {
+                @Override
+                public void onSuccess(AckResult ackResult) {
+                    future.complete(ackResult);
+                }
+
+                @Override
+                public void onException(Throwable e) {
+                    future.completeExceptionally(e);
+                }
+            }, topic, consumerGroup, extraInfoList);
+        } catch (Throwable t) {
+            future.completeExceptionally(t);
+        }
+        return future;
+    }
+
     public CompletableFuture<AckResult> changeInvisibleTimeAsync(String 
brokerAddr, String brokerName, String topic,
         String consumerGroup, String extraInfo, long invisibleTime) {
         String[] extraInfoStrs = ExtraInfoUtil.split(extraInfo);
diff --git 
a/test/src/test/java/org/apache/rocketmq/test/client/consumer/pop/BasePopNormally.java
 
b/test/src/test/java/org/apache/rocketmq/test/client/consumer/pop/BasePopNormally.java
index 952fbe3f5..2e29b95a5 100644
--- 
a/test/src/test/java/org/apache/rocketmq/test/client/consumer/pop/BasePopNormally.java
+++ 
b/test/src/test/java/org/apache/rocketmq/test/client/consumer/pop/BasePopNormally.java
@@ -63,4 +63,10 @@ public class BasePopNormally extends BasePop {
             brokerAddr, messageQueue, invisibleTime, maxNums, group, timeout, 
true,
             ConsumeInitMode.MIN, false, ExpressionType.TAG, "*");
     }
+
+    protected CompletableFuture<PopResult> popMessageAsync(long invisibleTime, 
int maxNums) {
+        return client.popMessageAsync(
+            brokerAddr, messageQueue, invisibleTime, maxNums, group, 3000, 
false,
+            ConsumeInitMode.MIN, false, ExpressionType.TAG, "*");
+    }
 }
diff --git 
a/test/src/test/java/org/apache/rocketmq/test/client/consumer/pop/BatchAckIT.java
 
b/test/src/test/java/org/apache/rocketmq/test/client/consumer/pop/BatchAckIT.java
new file mode 100644
index 000000000..ec9153ccc
--- /dev/null
+++ 
b/test/src/test/java/org/apache/rocketmq/test/client/consumer/pop/BatchAckIT.java
@@ -0,0 +1,159 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.test.client.consumer.pop;
+
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.function.Supplier;
+import org.apache.rocketmq.client.consumer.AckResult;
+import org.apache.rocketmq.client.consumer.AckStatus;
+import org.apache.rocketmq.client.consumer.PopResult;
+import org.apache.rocketmq.client.consumer.PopStatus;
+import org.apache.rocketmq.common.attribute.CQType;
+import org.apache.rocketmq.common.attribute.TopicMessageType;
+import org.apache.rocketmq.common.constant.ConsumeInitMode;
+import org.apache.rocketmq.common.filter.ExpressionType;
+import org.apache.rocketmq.common.message.MessageConst;
+import org.apache.rocketmq.common.message.MessageExt;
+import org.apache.rocketmq.common.message.MessageQueue;
+import org.apache.rocketmq.test.base.IntegrationTestBase;
+import org.apache.rocketmq.test.client.rmq.RMQNormalProducer;
+import org.apache.rocketmq.test.client.rmq.RMQPopClient;
+import org.apache.rocketmq.test.util.MQRandomUtils;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import static org.awaitility.Awaitility.await;
+import static org.junit.Assert.assertEquals;
+
+public class BatchAckIT extends BasePop {
+
+    protected String topic;
+    protected String group;
+    protected RMQNormalProducer producer = null;
+    protected RMQPopClient client = null;
+    protected String brokerAddr;
+    protected MessageQueue messageQueue;
+
+    @Before
+    public void setUp() {
+        brokerAddr = brokerController1.getBrokerAddr();
+        topic = MQRandomUtils.getRandomTopic();
+        group = initConsumerGroup();
+        IntegrationTestBase.initTopic(topic, NAMESRV_ADDR, BROKER1_NAME, 8, 
CQType.SimpleCQ, TopicMessageType.NORMAL);
+        producer = getProducer(NAMESRV_ADDR, topic);
+        client = getRMQPopClient();
+        messageQueue = new MessageQueue(topic, BROKER1_NAME, -1);
+    }
+
+    @After
+    public void tearDown() {
+        shutdown();
+    }
+
+    @Test
+    public void testBatchAckNormallyWithPopBuffer() throws Throwable {
+        brokerController1.getBrokerConfig().setEnablePopBufferMerge(true);
+        brokerController2.getBrokerConfig().setEnablePopBufferMerge(true);
+
+        testBatchAck(() -> {
+            try {
+                return popMessageAsync().get();
+            } catch (Exception e) {
+                throw new RuntimeException(e);
+            }
+        });
+    }
+
+    @Test
+    public void testBatchAckNormallyWithOutPopBuffer() throws Throwable {
+        brokerController1.getBrokerConfig().setEnablePopBufferMerge(false);
+        brokerController2.getBrokerConfig().setEnablePopBufferMerge(false);
+
+        testBatchAck(() -> {
+            try {
+                return popMessageAsync().get();
+            } catch (Exception e) {
+                throw new RuntimeException(e);
+            }
+        });
+    }
+
+    @Test
+    public void testBatchAckOrderly() throws Throwable {
+        testBatchAck(() -> {
+            try {
+                return popMessageOrderlyAsync().get();
+            } catch (Exception e) {
+                throw new RuntimeException(e);
+            }
+        });
+    }
+
+    public void testBatchAck(Supplier<PopResult> popResultSupplier) throws 
Throwable {
+        // Send 10 messages but do not ack, let them enter the retry topic
+        producer.send(10);
+        AtomicInteger firstMsgRcvNum = new AtomicInteger();
+        await().atMost(Duration.ofSeconds(3)).untilAsserted(() -> {
+            PopResult popResult = popResultSupplier.get();
+            if (popResult.getPopStatus().equals(PopStatus.FOUND)) {
+                firstMsgRcvNum.addAndGet(popResult.getMsgFoundList().size());
+            }
+            assertEquals(10, firstMsgRcvNum.get());
+        });
+        // sleep 6s, expect messages to enter the retry topic
+        TimeUnit.SECONDS.sleep(6);
+
+        producer.send(20);
+        List<String> extraInfoList = new ArrayList<>();
+        await().atMost(Duration.ofSeconds(3)).untilAsserted(() -> {
+            PopResult popResult = popResultSupplier.get();
+            if (popResult.getPopStatus().equals(PopStatus.FOUND)) {
+                for (MessageExt messageExt : popResult.getMsgFoundList()) {
+                    
extraInfoList.add(messageExt.getProperty(MessageConst.PROPERTY_POP_CK));
+                }
+            }
+            assertEquals(30, extraInfoList.size());
+        });
+
+        AckResult ackResult = client.batchAckMessageAsync(brokerAddr, topic, 
group, extraInfoList).get();
+        assertEquals(AckStatus.OK, ackResult.getStatus());
+
+        // sleep 6s, expected that messages that have been acked will not be 
re-consumed
+        TimeUnit.SECONDS.sleep(6);
+        PopResult popResult = popResultSupplier.get();
+        assertEquals(PopStatus.POLLING_NOT_FOUND, popResult.getPopStatus());
+    }
+
+    private CompletableFuture<PopResult> popMessageAsync() {
+        return client.popMessageAsync(
+            brokerAddr, messageQueue, Duration.ofSeconds(3).toMillis(), 30, 
group, 3000, false,
+            ConsumeInitMode.MIN, false, ExpressionType.TAG, "*");
+    }
+
+    private CompletableFuture<PopResult> popMessageOrderlyAsync() {
+        return client.popMessageAsync(
+            brokerAddr, messageQueue, Duration.ofSeconds(3).toMillis(), 30, 
group, 3000, false,
+            ConsumeInitMode.MIN, true, ExpressionType.TAG, "*", null);
+    }
+}

Reply via email to