This is an automated email from the ASF dual-hosted git repository.
kaili pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git
The following commit(s) were added to refs/heads/develop by this push:
new 72d796f2b [ISSUE #7205] support batch ack for pop orderly (#7206)
72d796f2b is described below
commit 72d796f2b20b3ec6aebca8c004d9275d7c749a95
Author: lk <[email protected]>
AuthorDate: Fri Aug 18 11:55:39 2023 +0800
[ISSUE #7205] support batch ack for pop orderly (#7206)
---
.../broker/processor/AckMessageProcessor.java | 99 +++++++------
.../rocketmq/client/impl/MQClientAPIImpl.java | 91 ++++++++++--
.../rocketmq/test/client/rmq/RMQPopClient.java | 22 +++
.../test/client/consumer/pop/BasePopNormally.java | 6 +
.../test/client/consumer/pop/BatchAckIT.java | 159 +++++++++++++++++++++
5 files changed, 322 insertions(+), 55 deletions(-)
diff --git
a/broker/src/main/java/org/apache/rocketmq/broker/processor/AckMessageProcessor.java
b/broker/src/main/java/org/apache/rocketmq/broker/processor/AckMessageProcessor.java
index 687811409..244b459d6 100644
---
a/broker/src/main/java/org/apache/rocketmq/broker/processor/AckMessageProcessor.java
+++
b/broker/src/main/java/org/apache/rocketmq/broker/processor/AckMessageProcessor.java
@@ -19,6 +19,7 @@ package org.apache.rocketmq.broker.processor;
import com.alibaba.fastjson.JSON;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext;
+import java.util.BitSet;
import org.apache.rocketmq.broker.BrokerController;
import org.apache.rocketmq.broker.metrics.PopMetricsManager;
import org.apache.rocketmq.common.KeyBuilder;
@@ -186,46 +187,7 @@ public class AckMessageProcessor implements
NettyRequestProcessor {
invisibleTime = ExtraInfoUtil.getInvisibleTime(extraInfo);
if (rqId == KeyBuilder.POP_ORDER_REVIVE_QUEUE) {
- // order
- String lockKey = topic + PopAckConstants.SPLIT + consumeGroup
+ PopAckConstants.SPLIT + qId;
- long oldOffset =
this.brokerController.getConsumerOffsetManager().queryOffset(consumeGroup,
topic, qId);
- if (ackOffset < oldOffset) {
- return;
- }
- while
(!this.brokerController.getPopMessageProcessor().getQueueLockManager().tryLock(lockKey))
{
- }
- try {
- oldOffset =
this.brokerController.getConsumerOffsetManager().queryOffset(consumeGroup,
topic, qId);
- if (ackOffset < oldOffset) {
- return;
- }
- long nextOffset =
brokerController.getConsumerOrderInfoManager().commitAndNext(
- topic, consumeGroup,
- qId, ackOffset,
- popTime);
- if (nextOffset > -1) {
- if
(!this.brokerController.getConsumerOffsetManager().hasOffsetReset(
- topic, consumeGroup, qId)) {
-
this.brokerController.getConsumerOffsetManager().commitOffset(channel.remoteAddress().toString(),
- consumeGroup, topic, qId, nextOffset);
- }
- if
(!this.brokerController.getConsumerOrderInfoManager().checkBlock(null, topic,
- consumeGroup, qId, invisibleTime)) {
-
this.brokerController.getPopMessageProcessor().notifyMessageArriving(
- topic, consumeGroup, qId);
- }
- } else if (nextOffset == -1) {
- String errorInfo = String.format("offset is illegal,
key:%s, old:%d, commit:%d, next:%d, %s",
- lockKey, oldOffset, ackOffset, nextOffset,
channel.remoteAddress());
- POP_LOGGER.warn(errorInfo);
- response.setCode(ResponseCode.MESSAGE_ILLEGAL);
- response.setRemark(errorInfo);
- return;
- }
- } finally {
-
this.brokerController.getPopMessageProcessor().getQueueLockManager().unLock(lockKey);
- }
-
brokerController.getPopInflightMessageCounter().decrementInFlightMessageNum(topic,
consumeGroup, popTime, qId, ackCount);
+ ackOrderly(topic, consumeGroup, qId, ackOffset, popTime,
invisibleTime, channel, response);
return;
}
@@ -250,17 +212,22 @@ public class AckMessageProcessor implements
NettyRequestProcessor {
}
BatchAckMsg batchAckMsg = new BatchAckMsg();
- for (int i = 0; batchAck.getBitSet() != null && i <
batchAck.getBitSet().length(); i++) {
- if (!batchAck.getBitSet().get(i)) {
- continue;
+ BitSet bitSet = batchAck.getBitSet();
+ for (int i = bitSet.nextSetBit(0); i >= 0; i = bitSet.nextSetBit(i
+ 1)) {
+ if (i == Integer.MAX_VALUE) {
+ break;
}
long offset = startOffset + i;
if (offset < minOffset || offset > maxOffset) {
continue;
}
- batchAckMsg.getAckOffsetList().add(offset);
+ if (rqId == KeyBuilder.POP_ORDER_REVIVE_QUEUE) {
+ ackOrderly(topic, consumeGroup, qId, offset, popTime,
invisibleTime, channel, response);
+ } else {
+ batchAckMsg.getAckOffsetList().add(offset);
+ }
}
- if (batchAckMsg.getAckOffsetList().isEmpty()) {
+ if (rqId == KeyBuilder.POP_ORDER_REVIVE_QUEUE ||
batchAckMsg.getAckOffsetList().isEmpty()) {
return;
}
@@ -311,4 +278,46 @@ public class AckMessageProcessor implements
NettyRequestProcessor {
PopMetricsManager.incPopReviveAckPutCount(ackMsg,
putMessageResult.getPutMessageStatus());
brokerController.getPopInflightMessageCounter().decrementInFlightMessageNum(topic,
consumeGroup, popTime, qId, ackCount);
}
+
+ protected void ackOrderly(String topic, String consumeGroup, int qId, long
ackOffset, long popTime, long invisibleTime, Channel channel, RemotingCommand
response) {
+ String lockKey = topic + PopAckConstants.SPLIT + consumeGroup +
PopAckConstants.SPLIT + qId;
+ long oldOffset =
this.brokerController.getConsumerOffsetManager().queryOffset(consumeGroup,
topic, qId);
+ if (ackOffset < oldOffset) {
+ return;
+ }
+ while
(!this.brokerController.getPopMessageProcessor().getQueueLockManager().tryLock(lockKey))
{
+ }
+ try {
+ oldOffset =
this.brokerController.getConsumerOffsetManager().queryOffset(consumeGroup,
topic, qId);
+ if (ackOffset < oldOffset) {
+ return;
+ }
+ long nextOffset =
brokerController.getConsumerOrderInfoManager().commitAndNext(
+ topic, consumeGroup,
+ qId, ackOffset,
+ popTime);
+ if (nextOffset > -1) {
+ if
(!this.brokerController.getConsumerOffsetManager().hasOffsetReset(
+ topic, consumeGroup, qId)) {
+
this.brokerController.getConsumerOffsetManager().commitOffset(channel.remoteAddress().toString(),
+ consumeGroup, topic, qId, nextOffset);
+ }
+ if
(!this.brokerController.getConsumerOrderInfoManager().checkBlock(null, topic,
+ consumeGroup, qId, invisibleTime)) {
+
this.brokerController.getPopMessageProcessor().notifyMessageArriving(
+ topic, consumeGroup, qId);
+ }
+ } else if (nextOffset == -1) {
+ String errorInfo = String.format("offset is illegal, key:%s,
old:%d, commit:%d, next:%d, %s",
+ lockKey, oldOffset, ackOffset, nextOffset,
channel.remoteAddress());
+ POP_LOGGER.warn(errorInfo);
+ response.setCode(ResponseCode.MESSAGE_ILLEGAL);
+ response.setRemark(errorInfo);
+ return;
+ }
+ } finally {
+
this.brokerController.getPopMessageProcessor().getQueueLockManager().unLock(lockKey);
+ }
+
brokerController.getPopInflightMessageCounter().decrementInFlightMessageNum(topic,
consumeGroup, popTime, qId, 1);
+ }
}
diff --git
a/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java
b/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java
index 5101ffc8e..213c26fd6 100644
--- a/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java
+++ b/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java
@@ -21,6 +21,7 @@ import java.io.UnsupportedEncodingException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Arrays;
+import java.util.BitSet;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
@@ -54,6 +55,7 @@ import org.apache.rocketmq.client.producer.SendCallback;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.client.producer.SendStatus;
import org.apache.rocketmq.common.AclConfig;
+import org.apache.rocketmq.common.BoundaryType;
import org.apache.rocketmq.common.MQVersion;
import org.apache.rocketmq.common.MixAll;
import org.apache.rocketmq.common.Pair;
@@ -76,7 +78,8 @@ import
org.apache.rocketmq.common.namesrv.NameServerUpdateCallback;
import org.apache.rocketmq.common.namesrv.TopAddressing;
import org.apache.rocketmq.common.sysflag.PullSysFlag;
import org.apache.rocketmq.common.topic.TopicValidator;
-import org.apache.rocketmq.common.BoundaryType;
+import org.apache.rocketmq.logging.org.slf4j.Logger;
+import org.apache.rocketmq.logging.org.slf4j.LoggerFactory;
import org.apache.rocketmq.remoting.CommandCustomHeader;
import org.apache.rocketmq.remoting.InvokeCallback;
import org.apache.rocketmq.remoting.RPCHook;
@@ -101,7 +104,10 @@ import org.apache.rocketmq.remoting.protocol.RequestCode;
import org.apache.rocketmq.remoting.protocol.ResponseCode;
import org.apache.rocketmq.remoting.protocol.admin.ConsumeStats;
import org.apache.rocketmq.remoting.protocol.admin.TopicStatsTable;
+import org.apache.rocketmq.remoting.protocol.body.BatchAck;
+import org.apache.rocketmq.remoting.protocol.body.BatchAckMessageRequestBody;
import org.apache.rocketmq.remoting.protocol.body.BrokerMemberGroup;
+import org.apache.rocketmq.remoting.protocol.body.BrokerReplicasInfo;
import org.apache.rocketmq.remoting.protocol.body.BrokerStatsData;
import org.apache.rocketmq.remoting.protocol.body.CheckClientRequestBody;
import org.apache.rocketmq.remoting.protocol.body.ClusterAclVersionInfo;
@@ -114,7 +120,6 @@ import
org.apache.rocketmq.remoting.protocol.body.EpochEntryCache;
import org.apache.rocketmq.remoting.protocol.body.GetConsumerStatusBody;
import org.apache.rocketmq.remoting.protocol.body.GroupList;
import org.apache.rocketmq.remoting.protocol.body.HARuntimeInfo;
-import org.apache.rocketmq.remoting.protocol.body.BrokerReplicasInfo;
import org.apache.rocketmq.remoting.protocol.body.KVTable;
import org.apache.rocketmq.remoting.protocol.body.LockBatchRequestBody;
import org.apache.rocketmq.remoting.protocol.body.LockBatchResponseBody;
@@ -196,6 +201,10 @@ import
org.apache.rocketmq.remoting.protocol.header.UpdateGlobalWhiteAddrsConfig
import
org.apache.rocketmq.remoting.protocol.header.UpdateGroupForbiddenRequestHeader;
import
org.apache.rocketmq.remoting.protocol.header.ViewBrokerStatsDataRequestHeader;
import org.apache.rocketmq.remoting.protocol.header.ViewMessageRequestHeader;
+import
org.apache.rocketmq.remoting.protocol.header.controller.ElectMasterRequestHeader;
+import
org.apache.rocketmq.remoting.protocol.header.controller.ElectMasterResponseHeader;
+import
org.apache.rocketmq.remoting.protocol.header.controller.GetMetaDataResponseHeader;
+import
org.apache.rocketmq.remoting.protocol.header.controller.admin.CleanControllerBrokerDataRequestHeader;
import
org.apache.rocketmq.remoting.protocol.header.namesrv.AddWritePermOfBrokerRequestHeader;
import
org.apache.rocketmq.remoting.protocol.header.namesrv.AddWritePermOfBrokerResponseHeader;
import
org.apache.rocketmq.remoting.protocol.header.namesrv.DeleteKVConfigRequestHeader;
@@ -207,10 +216,6 @@ import
org.apache.rocketmq.remoting.protocol.header.namesrv.GetRouteInfoRequestH
import
org.apache.rocketmq.remoting.protocol.header.namesrv.PutKVConfigRequestHeader;
import
org.apache.rocketmq.remoting.protocol.header.namesrv.WipeWritePermOfBrokerRequestHeader;
import
org.apache.rocketmq.remoting.protocol.header.namesrv.WipeWritePermOfBrokerResponseHeader;
-import
org.apache.rocketmq.remoting.protocol.header.controller.admin.CleanControllerBrokerDataRequestHeader;
-import
org.apache.rocketmq.remoting.protocol.header.controller.ElectMasterRequestHeader;
-import
org.apache.rocketmq.remoting.protocol.header.controller.ElectMasterResponseHeader;
-import
org.apache.rocketmq.remoting.protocol.header.controller.GetMetaDataResponseHeader;
import org.apache.rocketmq.remoting.protocol.heartbeat.HeartbeatData;
import org.apache.rocketmq.remoting.protocol.heartbeat.MessageModel;
import org.apache.rocketmq.remoting.protocol.heartbeat.SubscriptionData;
@@ -221,8 +226,6 @@ import
org.apache.rocketmq.remoting.protocol.subscription.GroupForbidden;
import
org.apache.rocketmq.remoting.protocol.subscription.SubscriptionGroupConfig;
import org.apache.rocketmq.remoting.rpchook.DynamicalExtFieldRPCHook;
import org.apache.rocketmq.remoting.rpchook.StreamTypeRPCHook;
-import org.apache.rocketmq.logging.org.slf4j.Logger;
-import org.apache.rocketmq.logging.org.slf4j.LoggerFactory;
import static
org.apache.rocketmq.remoting.protocol.RemotingSysResponseCode.SUCCESS;
@@ -885,9 +888,77 @@ public class MQClientAPIImpl implements
NameServerUpdateCallback {
final String addr,
final long timeOut,
final AckCallback ackCallback,
- final AckMessageRequestHeader requestHeader //
+ final AckMessageRequestHeader requestHeader
+ ) throws RemotingException, MQBrokerException, InterruptedException {
+ ackMessageAsync(addr, timeOut, ackCallback, requestHeader, null);
+ }
+
+ public void batchAckMessageAsync(
+ final String addr,
+ final long timeOut,
+ final AckCallback ackCallback,
+ final String topic,
+ final String consumerGroup,
+ final List<String> extraInfoList
+ ) throws RemotingException, MQBrokerException, InterruptedException {
+ String brokerName = null;
+ Map<String, BatchAck> batchAckMap = new HashMap<>();
+ for (String extraInfo : extraInfoList) {
+ String[] extraInfoData = ExtraInfoUtil.split(extraInfo);
+ if (brokerName == null) {
+ brokerName = ExtraInfoUtil.getBrokerName(extraInfoData);
+ }
+ String mergeKey = ExtraInfoUtil.getRetry(extraInfoData) + "@" +
+ ExtraInfoUtil.getQueueId(extraInfoData) + "@" +
+ ExtraInfoUtil.getCkQueueOffset(extraInfoData) + "@" +
+ ExtraInfoUtil.getPopTime(extraInfoData);
+ BatchAck bAck = batchAckMap.computeIfAbsent(mergeKey, k -> {
+ BatchAck newBatchAck = new BatchAck();
+ newBatchAck.setConsumerGroup(consumerGroup);
+ newBatchAck.setTopic(topic);
+ newBatchAck.setRetry(ExtraInfoUtil.getRetry(extraInfoData));
+
newBatchAck.setStartOffset(ExtraInfoUtil.getCkQueueOffset(extraInfoData));
+
newBatchAck.setQueueId(ExtraInfoUtil.getQueueId(extraInfoData));
+
newBatchAck.setReviveQueueId(ExtraInfoUtil.getReviveQid(extraInfoData));
+
newBatchAck.setPopTime(ExtraInfoUtil.getPopTime(extraInfoData));
+
newBatchAck.setInvisibleTime(ExtraInfoUtil.getInvisibleTime(extraInfoData));
+ newBatchAck.setBitSet(new BitSet());
+ return newBatchAck;
+ });
+ bAck.getBitSet().set((int)
(ExtraInfoUtil.getQueueOffset(extraInfoData) -
ExtraInfoUtil.getCkQueueOffset(extraInfoData)));
+ }
+
+ BatchAckMessageRequestBody requestBody = new
BatchAckMessageRequestBody();
+ requestBody.setBrokerName(brokerName);
+ requestBody.setAcks(new ArrayList<>(batchAckMap.values()));
+ batchAckMessageAsync(addr, timeOut, ackCallback, requestBody);
+ }
+
+ public void batchAckMessageAsync(
+ final String addr,
+ final long timeOut,
+ final AckCallback ackCallback,
+ final BatchAckMessageRequestBody requestBody
) throws RemotingException, MQBrokerException, InterruptedException {
- final RemotingCommand request =
RemotingCommand.createRequestCommand(RequestCode.ACK_MESSAGE, requestHeader);
+ ackMessageAsync(addr, timeOut, ackCallback, null, requestBody);
+ }
+
+ protected void ackMessageAsync(
+ final String addr,
+ final long timeOut,
+ final AckCallback ackCallback,
+ final AckMessageRequestHeader requestHeader,
+ final BatchAckMessageRequestBody requestBody
+ ) throws RemotingException, MQBrokerException, InterruptedException {
+ RemotingCommand request;
+ if (requestHeader != null) {
+ request =
RemotingCommand.createRequestCommand(RequestCode.ACK_MESSAGE, requestHeader);
+ } else {
+ request =
RemotingCommand.createRequestCommand(RequestCode.BATCH_ACK_MESSAGE, null);
+ if (requestBody != null) {
+ request.setBody(requestBody.encode());
+ }
+ }
this.remotingClient.invokeAsync(addr, request, timeOut, new
BaseInvokeCallback(MQClientAPIImpl.this) {
@Override
diff --git
a/test/src/main/java/org/apache/rocketmq/test/client/rmq/RMQPopClient.java
b/test/src/main/java/org/apache/rocketmq/test/client/rmq/RMQPopClient.java
index 496bd6da4..09c60c0b4 100644
--- a/test/src/main/java/org/apache/rocketmq/test/client/rmq/RMQPopClient.java
+++ b/test/src/main/java/org/apache/rocketmq/test/client/rmq/RMQPopClient.java
@@ -17,6 +17,7 @@
package org.apache.rocketmq.test.client.rmq;
+import java.util.List;
import java.util.concurrent.CompletableFuture;
import org.apache.rocketmq.client.ClientConfig;
import org.apache.rocketmq.client.consumer.AckCallback;
@@ -140,6 +141,27 @@ public class RMQPopClient implements MQConsumer {
return future;
}
+ public CompletableFuture<AckResult> batchAckMessageAsync(String
brokerAddr, String topic, String consumerGroup,
+ List<String> extraInfoList) {
+ CompletableFuture<AckResult> future = new CompletableFuture<>();
+ try {
+ this.mqClientAPI.batchAckMessageAsync(brokerAddr, DEFAULT_TIMEOUT,
new AckCallback() {
+ @Override
+ public void onSuccess(AckResult ackResult) {
+ future.complete(ackResult);
+ }
+
+ @Override
+ public void onException(Throwable e) {
+ future.completeExceptionally(e);
+ }
+ }, topic, consumerGroup, extraInfoList);
+ } catch (Throwable t) {
+ future.completeExceptionally(t);
+ }
+ return future;
+ }
+
public CompletableFuture<AckResult> changeInvisibleTimeAsync(String
brokerAddr, String brokerName, String topic,
String consumerGroup, String extraInfo, long invisibleTime) {
String[] extraInfoStrs = ExtraInfoUtil.split(extraInfo);
diff --git
a/test/src/test/java/org/apache/rocketmq/test/client/consumer/pop/BasePopNormally.java
b/test/src/test/java/org/apache/rocketmq/test/client/consumer/pop/BasePopNormally.java
index 952fbe3f5..2e29b95a5 100644
---
a/test/src/test/java/org/apache/rocketmq/test/client/consumer/pop/BasePopNormally.java
+++
b/test/src/test/java/org/apache/rocketmq/test/client/consumer/pop/BasePopNormally.java
@@ -63,4 +63,10 @@ public class BasePopNormally extends BasePop {
brokerAddr, messageQueue, invisibleTime, maxNums, group, timeout,
true,
ConsumeInitMode.MIN, false, ExpressionType.TAG, "*");
}
+
+ protected CompletableFuture<PopResult> popMessageAsync(long invisibleTime,
int maxNums) {
+ return client.popMessageAsync(
+ brokerAddr, messageQueue, invisibleTime, maxNums, group, 3000,
false,
+ ConsumeInitMode.MIN, false, ExpressionType.TAG, "*");
+ }
}
diff --git
a/test/src/test/java/org/apache/rocketmq/test/client/consumer/pop/BatchAckIT.java
b/test/src/test/java/org/apache/rocketmq/test/client/consumer/pop/BatchAckIT.java
new file mode 100644
index 000000000..ec9153ccc
--- /dev/null
+++
b/test/src/test/java/org/apache/rocketmq/test/client/consumer/pop/BatchAckIT.java
@@ -0,0 +1,159 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.test.client.consumer.pop;
+
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.function.Supplier;
+import org.apache.rocketmq.client.consumer.AckResult;
+import org.apache.rocketmq.client.consumer.AckStatus;
+import org.apache.rocketmq.client.consumer.PopResult;
+import org.apache.rocketmq.client.consumer.PopStatus;
+import org.apache.rocketmq.common.attribute.CQType;
+import org.apache.rocketmq.common.attribute.TopicMessageType;
+import org.apache.rocketmq.common.constant.ConsumeInitMode;
+import org.apache.rocketmq.common.filter.ExpressionType;
+import org.apache.rocketmq.common.message.MessageConst;
+import org.apache.rocketmq.common.message.MessageExt;
+import org.apache.rocketmq.common.message.MessageQueue;
+import org.apache.rocketmq.test.base.IntegrationTestBase;
+import org.apache.rocketmq.test.client.rmq.RMQNormalProducer;
+import org.apache.rocketmq.test.client.rmq.RMQPopClient;
+import org.apache.rocketmq.test.util.MQRandomUtils;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import static org.awaitility.Awaitility.await;
+import static org.junit.Assert.assertEquals;
+
+public class BatchAckIT extends BasePop {
+
+ protected String topic;
+ protected String group;
+ protected RMQNormalProducer producer = null;
+ protected RMQPopClient client = null;
+ protected String brokerAddr;
+ protected MessageQueue messageQueue;
+
+ @Before
+ public void setUp() {
+ brokerAddr = brokerController1.getBrokerAddr();
+ topic = MQRandomUtils.getRandomTopic();
+ group = initConsumerGroup();
+ IntegrationTestBase.initTopic(topic, NAMESRV_ADDR, BROKER1_NAME, 8,
CQType.SimpleCQ, TopicMessageType.NORMAL);
+ producer = getProducer(NAMESRV_ADDR, topic);
+ client = getRMQPopClient();
+ messageQueue = new MessageQueue(topic, BROKER1_NAME, -1);
+ }
+
+ @After
+ public void tearDown() {
+ shutdown();
+ }
+
+ @Test
+ public void testBatchAckNormallyWithPopBuffer() throws Throwable {
+ brokerController1.getBrokerConfig().setEnablePopBufferMerge(true);
+ brokerController2.getBrokerConfig().setEnablePopBufferMerge(true);
+
+ testBatchAck(() -> {
+ try {
+ return popMessageAsync().get();
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ });
+ }
+
+ @Test
+ public void testBatchAckNormallyWithOutPopBuffer() throws Throwable {
+ brokerController1.getBrokerConfig().setEnablePopBufferMerge(false);
+ brokerController2.getBrokerConfig().setEnablePopBufferMerge(false);
+
+ testBatchAck(() -> {
+ try {
+ return popMessageAsync().get();
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ });
+ }
+
+ @Test
+ public void testBatchAckOrderly() throws Throwable {
+ testBatchAck(() -> {
+ try {
+ return popMessageOrderlyAsync().get();
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ });
+ }
+
+ public void testBatchAck(Supplier<PopResult> popResultSupplier) throws
Throwable {
+ // Send 10 messages but do not ack, let them enter the retry topic
+ producer.send(10);
+ AtomicInteger firstMsgRcvNum = new AtomicInteger();
+ await().atMost(Duration.ofSeconds(3)).untilAsserted(() -> {
+ PopResult popResult = popResultSupplier.get();
+ if (popResult.getPopStatus().equals(PopStatus.FOUND)) {
+ firstMsgRcvNum.addAndGet(popResult.getMsgFoundList().size());
+ }
+ assertEquals(10, firstMsgRcvNum.get());
+ });
+ // sleep 6s, expect messages to enter the retry topic
+ TimeUnit.SECONDS.sleep(6);
+
+ producer.send(20);
+ List<String> extraInfoList = new ArrayList<>();
+ await().atMost(Duration.ofSeconds(3)).untilAsserted(() -> {
+ PopResult popResult = popResultSupplier.get();
+ if (popResult.getPopStatus().equals(PopStatus.FOUND)) {
+ for (MessageExt messageExt : popResult.getMsgFoundList()) {
+
extraInfoList.add(messageExt.getProperty(MessageConst.PROPERTY_POP_CK));
+ }
+ }
+ assertEquals(30, extraInfoList.size());
+ });
+
+ AckResult ackResult = client.batchAckMessageAsync(brokerAddr, topic,
group, extraInfoList).get();
+ assertEquals(AckStatus.OK, ackResult.getStatus());
+
+ // sleep 6s, expected that messages that have been acked will not be
re-consumed
+ TimeUnit.SECONDS.sleep(6);
+ PopResult popResult = popResultSupplier.get();
+ assertEquals(PopStatus.POLLING_NOT_FOUND, popResult.getPopStatus());
+ }
+
+ private CompletableFuture<PopResult> popMessageAsync() {
+ return client.popMessageAsync(
+ brokerAddr, messageQueue, Duration.ofSeconds(3).toMillis(), 30,
group, 3000, false,
+ ConsumeInitMode.MIN, false, ExpressionType.TAG, "*");
+ }
+
+ private CompletableFuture<PopResult> popMessageOrderlyAsync() {
+ return client.popMessageAsync(
+ brokerAddr, messageQueue, Duration.ofSeconds(3).toMillis(), 30,
group, 3000, false,
+ ConsumeInitMode.MIN, true, ExpressionType.TAG, "*", null);
+ }
+}