This is an automated email from the ASF dual-hosted git repository.

kaili pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git


The following commit(s) were added to refs/heads/develop by this push:
     new 77e8e54b37 [ISSUE #7223] Support batch ack for grpc client in proxy 
(#7225)
77e8e54b37 is described below

commit 77e8e54b37c3fc3ea0beffc1ace6f5bf20af10d9
Author: lk <[email protected]>
AuthorDate: Wed Aug 23 15:56:39 2023 +0800

    [ISSUE #7223] Support batch ack for grpc client in proxy (#7225)
---
 .../client/impl/mqclient/MQClientAPIExt.java       |  26 +++
 .../apache/rocketmq/proxy/config/ProxyConfig.java  |  10 +
 .../proxy/grpc/v2/consumer/AckMessageActivity.java | 136 +++++++++----
 .../proxy/processor/AbstractProcessor.java         |   4 +-
 .../rocketmq/proxy/processor/BatchAckResult.java   |  53 +++++
 .../proxy/processor/ConsumerProcessor.java         |  64 ++++++
 .../proxy/processor/DefaultMessagingProcessor.java |   7 +
 .../proxy/processor/MessagingProcessor.java        |  18 ++
 .../service/message/ClusterMessageService.java     |  16 +-
 .../proxy/service/message/LocalMessageService.java |  58 ++++++
 .../proxy/service/message/MessageService.java      |   8 +
 .../service/message/ReceiptHandleMessage.java      |  39 ++++
 .../grpc/v2/consumer/AckMessageActivityTest.java   | 221 ++++++++++++++++++---
 .../proxy/processor/BaseProcessorTest.java         |  18 +-
 .../proxy/processor/ConsumerProcessorTest.java     | 115 +++++++++++
 .../proxy/service/mqclient/MQClientAPIExtTest.java |  12 ++
 16 files changed, 728 insertions(+), 77 deletions(-)

diff --git 
a/client/src/main/java/org/apache/rocketmq/client/impl/mqclient/MQClientAPIExt.java
 
b/client/src/main/java/org/apache/rocketmq/client/impl/mqclient/MQClientAPIExt.java
index fb8f8d11fd..d7c8ef8d92 100644
--- 
a/client/src/main/java/org/apache/rocketmq/client/impl/mqclient/MQClientAPIExt.java
+++ 
b/client/src/main/java/org/apache/rocketmq/client/impl/mqclient/MQClientAPIExt.java
@@ -306,6 +306,32 @@ public class MQClientAPIExt extends MQClientAPIImpl {
         return future;
     }
 
+    public CompletableFuture<AckResult> batchAckMessageAsync(
+        String brokerAddr,
+        String topic,
+        String consumerGroup,
+        List<String> extraInfoList,
+        long timeoutMillis
+    ) {
+        CompletableFuture<AckResult> future = new CompletableFuture<>();
+        try {
+            this.batchAckMessageAsync(brokerAddr, timeoutMillis, new 
AckCallback() {
+                @Override
+                public void onSuccess(AckResult ackResult) {
+                    future.complete(ackResult);
+                }
+
+                @Override
+                public void onException(Throwable t) {
+                    future.completeExceptionally(t);
+                }
+            }, topic, consumerGroup, extraInfoList);
+        } catch (Throwable t) {
+            future.completeExceptionally(t);
+        }
+        return future;
+    }
+
     public CompletableFuture<AckResult> changeInvisibleTimeAsync(
         String brokerAddr,
         String brokerName,
diff --git 
a/proxy/src/main/java/org/apache/rocketmq/proxy/config/ProxyConfig.java 
b/proxy/src/main/java/org/apache/rocketmq/proxy/config/ProxyConfig.java
index 39caaa0d91..76a2439196 100644
--- a/proxy/src/main/java/org/apache/rocketmq/proxy/config/ProxyConfig.java
+++ b/proxy/src/main/java/org/apache/rocketmq/proxy/config/ProxyConfig.java
@@ -250,6 +250,8 @@ public class ProxyConfig implements ConfigFile {
     private long remotingWaitTimeMillsInTopicRouteQueue = 3 * 1000;
     private long remotingWaitTimeMillsInDefaultQueue = 3 * 1000;
 
+    private boolean enableBatchAck = false;
+
     @Override
     public void initData() {
         parseDelayLevel();
@@ -1379,4 +1381,12 @@ public class ProxyConfig implements ConfigFile {
     public void setRemotingWaitTimeMillsInDefaultQueue(long 
remotingWaitTimeMillsInDefaultQueue) {
         this.remotingWaitTimeMillsInDefaultQueue = 
remotingWaitTimeMillsInDefaultQueue;
     }
+
+    public boolean isEnableBatchAck() {
+        return enableBatchAck;
+    }
+
+    public void setEnableBatchAck(boolean enableBatchAck) {
+        this.enableBatchAck = enableBatchAck;
+    }
 }
diff --git 
a/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/consumer/AckMessageActivity.java
 
b/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/consumer/AckMessageActivity.java
index 9a3a772017..97c716c8ff 100644
--- 
a/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/consumer/AckMessageActivity.java
+++ 
b/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/consumer/AckMessageActivity.java
@@ -31,12 +31,15 @@ import org.apache.rocketmq.client.consumer.AckStatus;
 import org.apache.rocketmq.common.consumer.ReceiptHandle;
 import org.apache.rocketmq.proxy.common.MessageReceiptHandle;
 import org.apache.rocketmq.proxy.common.ProxyContext;
+import org.apache.rocketmq.proxy.config.ConfigurationManager;
 import org.apache.rocketmq.proxy.grpc.v2.AbstractMessingActivity;
 import org.apache.rocketmq.proxy.grpc.v2.channel.GrpcChannelManager;
 import org.apache.rocketmq.proxy.grpc.v2.common.GrpcClientSettingsManager;
 import org.apache.rocketmq.proxy.grpc.v2.common.GrpcConverter;
 import org.apache.rocketmq.proxy.grpc.v2.common.ResponseBuilder;
+import org.apache.rocketmq.proxy.processor.BatchAckResult;
 import org.apache.rocketmq.proxy.processor.MessagingProcessor;
+import org.apache.rocketmq.proxy.service.message.ReceiptHandleMessage;
 
 public class AckMessageActivity extends AbstractMessingActivity {
 
@@ -50,60 +53,98 @@ public class AckMessageActivity extends 
AbstractMessingActivity {
 
         try {
             validateTopicAndConsumerGroup(request.getTopic(), 
request.getGroup());
-
-            CompletableFuture<AckMessageResultEntry>[] futures = new 
CompletableFuture[request.getEntriesCount()];
-            for (int i = 0; i < request.getEntriesCount(); i++) {
-                futures[i] = processAckMessage(ctx, request, 
request.getEntries(i));
+            String group = 
GrpcConverter.getInstance().wrapResourceWithNamespace(request.getGroup());
+            String topic = 
GrpcConverter.getInstance().wrapResourceWithNamespace(request.getTopic());
+            if (ConfigurationManager.getProxyConfig().isEnableBatchAck()) {
+                future = ackMessageInBatch(ctx, group, topic, request);
+            } else {
+                future = ackMessageOneByOne(ctx, group, topic, request);
             }
-            CompletableFuture.allOf(futures).whenComplete((val, throwable) -> {
-                if (throwable != null) {
-                    future.completeExceptionally(throwable);
-                    return;
-                }
+        } catch (Throwable t) {
+            future.completeExceptionally(t);
+        }
+        return future;
+    }
+
+    protected CompletableFuture<AckMessageResponse> 
ackMessageInBatch(ProxyContext ctx, String group, String topic, 
AckMessageRequest request) {
+        List<ReceiptHandleMessage> handleMessageList = new 
ArrayList<>(request.getEntriesCount());
 
+        for (AckMessageEntry ackMessageEntry : request.getEntriesList()) {
+            String handleString = getHandleString(ctx, group, request, 
ackMessageEntry);
+            handleMessageList.add(new 
ReceiptHandleMessage(ReceiptHandle.decode(handleString), 
ackMessageEntry.getMessageId()));
+        }
+        return this.messagingProcessor.batchAckMessage(ctx, handleMessageList, 
group, topic)
+            .thenApply(batchAckResultList -> {
+                AckMessageResponse.Builder responseBuilder = 
AckMessageResponse.newBuilder();
                 Set<Code> responseCodes = new HashSet<>();
-                List<AckMessageResultEntry> entryList = new ArrayList<>();
-                for (CompletableFuture<AckMessageResultEntry> entryFuture : 
futures) {
-                    AckMessageResultEntry entryResult = entryFuture.join();
-                    responseCodes.add(entryResult.getStatus().getCode());
-                    entryList.add(entryResult);
-                }
-                AckMessageResponse.Builder responseBuilder = 
AckMessageResponse.newBuilder()
-                    .addAllEntries(entryList);
-                if (responseCodes.size() > 1) {
-                    
responseBuilder.setStatus(ResponseBuilder.getInstance().buildStatus(Code.MULTIPLE_RESULTS,
 Code.MULTIPLE_RESULTS.name()));
-                } else if (responseCodes.size() == 1) {
-                    Code code = responseCodes.stream().findAny().get();
-                    
responseBuilder.setStatus(ResponseBuilder.getInstance().buildStatus(code, 
code.name()));
-                } else {
-                    
responseBuilder.setStatus(ResponseBuilder.getInstance().buildStatus(Code.INTERNAL_SERVER_ERROR,
 "ack message result is empty"));
+                for (BatchAckResult batchAckResult : batchAckResultList) {
+                    AckMessageResultEntry entry = 
convertToAckMessageResultEntry(batchAckResult);
+                    responseBuilder.addEntries(entry);
+                    responseCodes.add(entry.getStatus().getCode());
                 }
-                future.complete(responseBuilder.build());
+                setAckResponseStatus(responseBuilder, responseCodes);
+                return responseBuilder.build();
             });
-        } catch (Throwable t) {
-            future.completeExceptionally(t);
+    }
+
+    protected AckMessageResultEntry 
convertToAckMessageResultEntry(BatchAckResult batchAckResult) {
+        ReceiptHandleMessage handleMessage = 
batchAckResult.getReceiptHandleMessage();
+        AckMessageResultEntry.Builder resultBuilder = 
AckMessageResultEntry.newBuilder()
+            .setMessageId(handleMessage.getMessageId())
+            
.setReceiptHandle(handleMessage.getReceiptHandle().getReceiptHandle());
+        if (batchAckResult.getProxyException() != null) {
+            
resultBuilder.setStatus(ResponseBuilder.getInstance().buildStatus(batchAckResult.getProxyException()));
+        } else {
+            AckResult ackResult = batchAckResult.getAckResult();
+            if (AckStatus.OK.equals(ackResult.getStatus())) {
+                
resultBuilder.setStatus(ResponseBuilder.getInstance().buildStatus(Code.OK, 
Code.OK.name()));
+            } else {
+                
resultBuilder.setStatus(ResponseBuilder.getInstance().buildStatus(Code.INTERNAL_SERVER_ERROR,
 "ack failed: status is abnormal"));
+            }
         }
-        return future;
+        return resultBuilder.build();
     }
 
-    protected CompletableFuture<AckMessageResultEntry> 
processAckMessage(ProxyContext ctx, AckMessageRequest request,
+    protected CompletableFuture<AckMessageResponse> 
ackMessageOneByOne(ProxyContext ctx, String group, String topic, 
AckMessageRequest request) {
+        CompletableFuture<AckMessageResponse> resultFuture = new 
CompletableFuture<>();
+        CompletableFuture<AckMessageResultEntry>[] futures = new 
CompletableFuture[request.getEntriesCount()];
+        for (int i = 0; i < request.getEntriesCount(); i++) {
+            futures[i] = processAckMessage(ctx, group, topic, request, 
request.getEntries(i));
+        }
+        CompletableFuture.allOf(futures).whenComplete((val, throwable) -> {
+            if (throwable != null) {
+                resultFuture.completeExceptionally(throwable);
+                return;
+            }
+
+            Set<Code> responseCodes = new HashSet<>();
+            List<AckMessageResultEntry> entryList = new ArrayList<>();
+            for (CompletableFuture<AckMessageResultEntry> entryFuture : 
futures) {
+                AckMessageResultEntry entryResult = entryFuture.join();
+                responseCodes.add(entryResult.getStatus().getCode());
+                entryList.add(entryResult);
+            }
+            AckMessageResponse.Builder responseBuilder = 
AckMessageResponse.newBuilder()
+                .addAllEntries(entryList);
+            setAckResponseStatus(responseBuilder, responseCodes);
+            resultFuture.complete(responseBuilder.build());
+        });
+        return resultFuture;
+    }
+
+    protected CompletableFuture<AckMessageResultEntry> 
processAckMessage(ProxyContext ctx, String group, String topic, 
AckMessageRequest request,
         AckMessageEntry ackMessageEntry) {
         CompletableFuture<AckMessageResultEntry> future = new 
CompletableFuture<>();
 
         try {
-            String handleString = ackMessageEntry.getReceiptHandle();
-
-            String group = 
GrpcConverter.getInstance().wrapResourceWithNamespace(request.getGroup());
-            MessageReceiptHandle messageReceiptHandle = 
messagingProcessor.removeReceiptHandle(ctx, 
grpcChannelManager.getChannel(ctx.getClientID()), group, 
ackMessageEntry.getMessageId(), ackMessageEntry.getReceiptHandle());
-            if (messageReceiptHandle != null) {
-                handleString = messageReceiptHandle.getReceiptHandleStr();
-            }
+            String handleString = this.getHandleString(ctx, group, request, 
ackMessageEntry);
             CompletableFuture<AckResult> ackResultFuture = 
this.messagingProcessor.ackMessage(
                 ctx,
                 ReceiptHandle.decode(handleString),
                 ackMessageEntry.getMessageId(),
                 group,
-                
GrpcConverter.getInstance().wrapResourceWithNamespace(request.getTopic()));
+                topic
+            );
             ackResultFuture.thenAccept(result -> {
                 future.complete(convertToAckMessageResultEntry(ctx, 
ackMessageEntry, result));
             }).exceptionally(t -> {
@@ -139,4 +180,25 @@ public class AckMessageActivity extends 
AbstractMessingActivity {
             
.setStatus(ResponseBuilder.getInstance().buildStatus(Code.INTERNAL_SERVER_ERROR,
 "ack failed: status is abnormal"))
             .build();
     }
+
+    protected void setAckResponseStatus(AckMessageResponse.Builder 
responseBuilder, Set<Code> responseCodes) {
+        if (responseCodes.size() > 1) {
+            
responseBuilder.setStatus(ResponseBuilder.getInstance().buildStatus(Code.MULTIPLE_RESULTS,
 Code.MULTIPLE_RESULTS.name()));
+        } else if (responseCodes.size() == 1) {
+            Code code = responseCodes.stream().findAny().get();
+            
responseBuilder.setStatus(ResponseBuilder.getInstance().buildStatus(code, 
code.name()));
+        } else {
+            
responseBuilder.setStatus(ResponseBuilder.getInstance().buildStatus(Code.INTERNAL_SERVER_ERROR,
 "ack message result is empty"));
+        }
+    }
+
+    protected String getHandleString(ProxyContext ctx, String group, 
AckMessageRequest request, AckMessageEntry ackMessageEntry) {
+        String handleString = ackMessageEntry.getReceiptHandle();
+
+        MessageReceiptHandle messageReceiptHandle = 
messagingProcessor.removeReceiptHandle(ctx, 
grpcChannelManager.getChannel(ctx.getClientID()), group, 
ackMessageEntry.getMessageId(), ackMessageEntry.getReceiptHandle());
+        if (messageReceiptHandle != null) {
+            handleString = messageReceiptHandle.getReceiptHandleStr();
+        }
+        return handleString;
+    }
 }
diff --git 
a/proxy/src/main/java/org/apache/rocketmq/proxy/processor/AbstractProcessor.java
 
b/proxy/src/main/java/org/apache/rocketmq/proxy/processor/AbstractProcessor.java
index b61c3df9e5..c63212c231 100644
--- 
a/proxy/src/main/java/org/apache/rocketmq/proxy/processor/AbstractProcessor.java
+++ 
b/proxy/src/main/java/org/apache/rocketmq/proxy/processor/AbstractProcessor.java
@@ -27,6 +27,8 @@ public abstract class AbstractProcessor extends 
AbstractStartAndShutdown {
     protected MessagingProcessor messagingProcessor;
     protected ServiceManager serviceManager;
 
+    protected static final ProxyException EXPIRED_HANDLE_PROXY_EXCEPTION = new 
ProxyException(ProxyExceptionCode.INVALID_RECEIPT_HANDLE, "receipt handle is 
expired");
+
     public AbstractProcessor(MessagingProcessor messagingProcessor,
         ServiceManager serviceManager) {
         this.messagingProcessor = messagingProcessor;
@@ -35,7 +37,7 @@ public abstract class AbstractProcessor extends 
AbstractStartAndShutdown {
 
     protected void validateReceiptHandle(ReceiptHandle handle) {
         if (handle.isExpired()) {
-            throw new 
ProxyException(ProxyExceptionCode.INVALID_RECEIPT_HANDLE, "receipt handle is 
expired");
+            throw EXPIRED_HANDLE_PROXY_EXCEPTION;
         }
     }
 }
diff --git 
a/proxy/src/main/java/org/apache/rocketmq/proxy/processor/BatchAckResult.java 
b/proxy/src/main/java/org/apache/rocketmq/proxy/processor/BatchAckResult.java
new file mode 100644
index 0000000000..dfb9c9b9e0
--- /dev/null
+++ 
b/proxy/src/main/java/org/apache/rocketmq/proxy/processor/BatchAckResult.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.proxy.processor;
+
+import org.apache.rocketmq.client.consumer.AckResult;
+import org.apache.rocketmq.proxy.common.ProxyException;
+import org.apache.rocketmq.proxy.service.message.ReceiptHandleMessage;
+
+public class BatchAckResult {
+
+    private final ReceiptHandleMessage receiptHandleMessage;
+    private AckResult ackResult;
+    private ProxyException proxyException;
+
+    public BatchAckResult(ReceiptHandleMessage receiptHandleMessage,
+        AckResult ackResult) {
+        this.receiptHandleMessage = receiptHandleMessage;
+        this.ackResult = ackResult;
+    }
+
+    public BatchAckResult(ReceiptHandleMessage receiptHandleMessage,
+        ProxyException proxyException) {
+        this.receiptHandleMessage = receiptHandleMessage;
+        this.proxyException = proxyException;
+    }
+
+    public ReceiptHandleMessage getReceiptHandleMessage() {
+        return receiptHandleMessage;
+    }
+
+    public AckResult getAckResult() {
+        return ackResult;
+    }
+
+    public ProxyException getProxyException() {
+        return proxyException;
+    }
+}
diff --git 
a/proxy/src/main/java/org/apache/rocketmq/proxy/processor/ConsumerProcessor.java
 
b/proxy/src/main/java/org/apache/rocketmq/proxy/processor/ConsumerProcessor.java
index 656a6339dc..f3522b3740 100644
--- 
a/proxy/src/main/java/org/apache/rocketmq/proxy/processor/ConsumerProcessor.java
+++ 
b/proxy/src/main/java/org/apache/rocketmq/proxy/processor/ConsumerProcessor.java
@@ -48,6 +48,7 @@ import org.apache.rocketmq.proxy.common.ProxyExceptionCode;
 import org.apache.rocketmq.proxy.common.utils.FutureUtils;
 import org.apache.rocketmq.proxy.common.utils.ProxyUtils;
 import org.apache.rocketmq.proxy.service.ServiceManager;
+import org.apache.rocketmq.proxy.service.message.ReceiptHandleMessage;
 import org.apache.rocketmq.proxy.service.route.AddressableMessageQueue;
 import org.apache.rocketmq.remoting.protocol.body.LockBatchRequestBody;
 import org.apache.rocketmq.remoting.protocol.body.UnlockBatchRequestBody;
@@ -241,6 +242,69 @@ public class ConsumerProcessor extends AbstractProcessor {
         return FutureUtils.addExecutor(future, this.executor);
     }
 
+    public CompletableFuture<List<BatchAckResult>> batchAckMessage(
+        ProxyContext ctx,
+        List<ReceiptHandleMessage> handleMessageList,
+        String consumerGroup,
+        String topic,
+        long timeoutMillis
+    ) {
+        CompletableFuture<List<BatchAckResult>> future = new 
CompletableFuture<>();
+        try {
+            List<BatchAckResult> batchAckResultList = new 
ArrayList<>(handleMessageList.size());
+            Map<String, List<ReceiptHandleMessage>> brokerHandleListMap = new 
HashMap<>();
+
+            for (ReceiptHandleMessage handleMessage : handleMessageList) {
+                if (handleMessage.getReceiptHandle().isExpired()) {
+                    batchAckResultList.add(new BatchAckResult(handleMessage, 
EXPIRED_HANDLE_PROXY_EXCEPTION));
+                    continue;
+                }
+                List<ReceiptHandleMessage> brokerHandleList = 
brokerHandleListMap.computeIfAbsent(handleMessage.getReceiptHandle().getBrokerName(),
 key -> new ArrayList<>());
+                brokerHandleList.add(handleMessage);
+            }
+
+            if (brokerHandleListMap.isEmpty()) {
+                return 
FutureUtils.addExecutor(CompletableFuture.completedFuture(batchAckResultList), 
this.executor);
+            }
+            Set<Map.Entry<String, List<ReceiptHandleMessage>>> 
brokerHandleListMapEntrySet = brokerHandleListMap.entrySet();
+            CompletableFuture<List<BatchAckResult>>[] futures = new 
CompletableFuture[brokerHandleListMapEntrySet.size()];
+            int futureIndex = 0;
+            for (Map.Entry<String, List<ReceiptHandleMessage>> entry : 
brokerHandleListMapEntrySet) {
+                futures[futureIndex++] = processBrokerHandle(ctx, 
consumerGroup, topic, entry.getValue(), timeoutMillis);
+            }
+            CompletableFuture.allOf(futures).whenComplete((val, throwable) -> {
+                if (throwable != null) {
+                    future.completeExceptionally(throwable);
+                }
+                for (CompletableFuture<List<BatchAckResult>> resultFuture : 
futures) {
+                    batchAckResultList.addAll(resultFuture.join());
+                }
+                future.complete(batchAckResultList);
+            });
+        } catch (Throwable t) {
+            future.completeExceptionally(t);
+        }
+        return FutureUtils.addExecutor(future, this.executor);
+    }
+
+    protected CompletableFuture<List<BatchAckResult>> 
processBrokerHandle(ProxyContext ctx, String consumerGroup, String topic, 
List<ReceiptHandleMessage> handleMessageList, long timeoutMillis) {
+        return this.serviceManager.getMessageService().batchAckMessage(ctx, 
handleMessageList, consumerGroup, topic, timeoutMillis)
+            .thenApply(result -> {
+                List<BatchAckResult> results = new ArrayList<>();
+                for (ReceiptHandleMessage handleMessage : handleMessageList) {
+                    results.add(new BatchAckResult(handleMessage, result));
+                }
+                return results;
+            })
+            .exceptionally(throwable -> {
+                List<BatchAckResult> results = new ArrayList<>();
+                for (ReceiptHandleMessage handleMessage : handleMessageList) {
+                    results.add(new BatchAckResult(handleMessage, new 
ProxyException(ProxyExceptionCode.INTERNAL_SERVER_ERROR, 
throwable.getMessage(), throwable)));
+                }
+                return results;
+            });
+    }
+
     public CompletableFuture<AckResult> changeInvisibleTime(ProxyContext ctx, 
ReceiptHandle handle,
         String messageId, String groupName, String topicName, long 
invisibleTime, long timeoutMillis) {
         CompletableFuture<AckResult> future = new CompletableFuture<>();
diff --git 
a/proxy/src/main/java/org/apache/rocketmq/proxy/processor/DefaultMessagingProcessor.java
 
b/proxy/src/main/java/org/apache/rocketmq/proxy/processor/DefaultMessagingProcessor.java
index 188cb7b9bd..ba150051bc 100644
--- 
a/proxy/src/main/java/org/apache/rocketmq/proxy/processor/DefaultMessagingProcessor.java
+++ 
b/proxy/src/main/java/org/apache/rocketmq/proxy/processor/DefaultMessagingProcessor.java
@@ -46,6 +46,7 @@ import org.apache.rocketmq.proxy.config.ConfigurationManager;
 import org.apache.rocketmq.proxy.config.ProxyConfig;
 import org.apache.rocketmq.proxy.service.ServiceManager;
 import org.apache.rocketmq.proxy.service.ServiceManagerFactory;
+import org.apache.rocketmq.proxy.service.message.ReceiptHandleMessage;
 import org.apache.rocketmq.proxy.service.metadata.MetadataService;
 import org.apache.rocketmq.proxy.service.relay.ProxyRelayService;
 import org.apache.rocketmq.proxy.service.route.ProxyTopicRouteData;
@@ -183,6 +184,12 @@ public class DefaultMessagingProcessor extends 
AbstractStartAndShutdown implemen
         return this.consumerProcessor.ackMessage(ctx, handle, messageId, 
consumerGroup, topic, timeoutMillis);
     }
 
+    @Override
+    public CompletableFuture<List<BatchAckResult>> 
batchAckMessage(ProxyContext ctx,
+        List<ReceiptHandleMessage> handleMessageList, String consumerGroup, 
String topic, long timeoutMillis) {
+        return this.consumerProcessor.batchAckMessage(ctx, handleMessageList, 
consumerGroup, topic, timeoutMillis);
+    }
+
     @Override
     public CompletableFuture<AckResult> changeInvisibleTime(ProxyContext ctx, 
ReceiptHandle handle, String messageId,
         String groupName, String topicName, long invisibleTime, long 
timeoutMillis) {
diff --git 
a/proxy/src/main/java/org/apache/rocketmq/proxy/processor/MessagingProcessor.java
 
b/proxy/src/main/java/org/apache/rocketmq/proxy/processor/MessagingProcessor.java
index d86be0bd88..2ae7418ba7 100644
--- 
a/proxy/src/main/java/org/apache/rocketmq/proxy/processor/MessagingProcessor.java
+++ 
b/proxy/src/main/java/org/apache/rocketmq/proxy/processor/MessagingProcessor.java
@@ -37,6 +37,7 @@ import org.apache.rocketmq.proxy.common.Address;
 import org.apache.rocketmq.proxy.common.MessageReceiptHandle;
 import org.apache.rocketmq.proxy.common.ProxyContext;
 import org.apache.rocketmq.common.utils.StartAndShutdown;
+import org.apache.rocketmq.proxy.service.message.ReceiptHandleMessage;
 import org.apache.rocketmq.proxy.service.metadata.MetadataService;
 import org.apache.rocketmq.proxy.service.relay.ProxyRelayService;
 import org.apache.rocketmq.proxy.service.route.ProxyTopicRouteData;
@@ -155,6 +156,23 @@ public interface MessagingProcessor extends 
StartAndShutdown {
         long timeoutMillis
     );
 
+    default CompletableFuture<List<BatchAckResult>> batchAckMessage(
+        ProxyContext ctx,
+        List<ReceiptHandleMessage> handleMessageList,
+        String consumerGroup,
+        String topic
+    ) {
+        return batchAckMessage(ctx, handleMessageList, consumerGroup, topic, 
DEFAULT_TIMEOUT_MILLS);
+    }
+
+    CompletableFuture<List<BatchAckResult>> batchAckMessage(
+        ProxyContext ctx,
+        List<ReceiptHandleMessage> handleMessageList,
+        String consumerGroup,
+        String topic,
+        long timeoutMillis
+    );
+
     default CompletableFuture<AckResult> changeInvisibleTime(
         ProxyContext ctx,
         ReceiptHandle handle,
diff --git 
a/proxy/src/main/java/org/apache/rocketmq/proxy/service/message/ClusterMessageService.java
 
b/proxy/src/main/java/org/apache/rocketmq/proxy/service/message/ClusterMessageService.java
index 9f163f1b98..70b72deae1 100644
--- 
a/proxy/src/main/java/org/apache/rocketmq/proxy/service/message/ClusterMessageService.java
+++ 
b/proxy/src/main/java/org/apache/rocketmq/proxy/service/message/ClusterMessageService.java
@@ -20,9 +20,11 @@ import com.google.common.collect.Lists;
 import java.util.List;
 import java.util.Set;
 import java.util.concurrent.CompletableFuture;
+import java.util.stream.Collectors;
 import org.apache.rocketmq.client.consumer.AckResult;
 import org.apache.rocketmq.client.consumer.PopResult;
 import org.apache.rocketmq.client.consumer.PullResult;
+import org.apache.rocketmq.client.impl.mqclient.MQClientAPIFactory;
 import org.apache.rocketmq.client.producer.SendResult;
 import org.apache.rocketmq.common.consumer.ReceiptHandle;
 import org.apache.rocketmq.common.message.Message;
@@ -31,7 +33,6 @@ import org.apache.rocketmq.proxy.common.ProxyContext;
 import org.apache.rocketmq.proxy.common.ProxyException;
 import org.apache.rocketmq.proxy.common.ProxyExceptionCode;
 import org.apache.rocketmq.proxy.common.utils.FutureUtils;
-import org.apache.rocketmq.client.impl.mqclient.MQClientAPIFactory;
 import org.apache.rocketmq.proxy.service.route.AddressableMessageQueue;
 import org.apache.rocketmq.proxy.service.route.TopicRouteService;
 import org.apache.rocketmq.remoting.protocol.RemotingCommand;
@@ -137,6 +138,19 @@ public class ClusterMessageService implements 
MessageService {
         );
     }
 
+    @Override
+    public CompletableFuture<AckResult> batchAckMessage(ProxyContext ctx, 
List<ReceiptHandleMessage> handleList, String consumerGroup,
+        String topic, long timeoutMillis) {
+        List<String> extraInfoList = handleList.stream().map(message -> 
message.getReceiptHandle().getReceiptHandle()).collect(Collectors.toList());
+        return this.mqClientAPIFactory.getClient().batchAckMessageAsync(
+            this.resolveBrokerAddrInReceiptHandle(ctx, 
handleList.get(0).getReceiptHandle()),
+            topic,
+            consumerGroup,
+            extraInfoList,
+            timeoutMillis
+        );
+    }
+
     @Override
     public CompletableFuture<PullResult> pullMessage(ProxyContext ctx, 
AddressableMessageQueue messageQueue,
         PullMessageRequestHeader requestHeader, long timeoutMillis) {
diff --git 
a/proxy/src/main/java/org/apache/rocketmq/proxy/service/message/LocalMessageService.java
 
b/proxy/src/main/java/org/apache/rocketmq/proxy/service/message/LocalMessageService.java
index eb2c4d9ee9..ca7dcc9eb0 100644
--- 
a/proxy/src/main/java/org/apache/rocketmq/proxy/service/message/LocalMessageService.java
+++ 
b/proxy/src/main/java/org/apache/rocketmq/proxy/service/message/LocalMessageService.java
@@ -19,6 +19,7 @@ package org.apache.rocketmq.proxy.service.message;
 import io.netty.channel.ChannelHandlerContext;
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
+import java.util.BitSet;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
@@ -54,6 +55,8 @@ import org.apache.rocketmq.remoting.RPCHook;
 import org.apache.rocketmq.remoting.protocol.RemotingCommand;
 import org.apache.rocketmq.remoting.protocol.RequestCode;
 import org.apache.rocketmq.remoting.protocol.ResponseCode;
+import org.apache.rocketmq.remoting.protocol.body.BatchAck;
+import org.apache.rocketmq.remoting.protocol.body.BatchAckMessageRequestBody;
 import org.apache.rocketmq.remoting.protocol.body.LockBatchRequestBody;
 import org.apache.rocketmq.remoting.protocol.body.UnlockBatchRequestBody;
 import org.apache.rocketmq.remoting.protocol.header.AckMessageRequestHeader;
@@ -364,6 +367,61 @@ public class LocalMessageService implements MessageService 
{
         });
     }
 
+    @Override
+    public CompletableFuture<AckResult> batchAckMessage(ProxyContext ctx, 
List<ReceiptHandleMessage> handleList,
+        String consumerGroup, String topic, long timeoutMillis) {
+        SimpleChannel channel = channelManager.createChannel(ctx);
+        ChannelHandlerContext channelHandlerContext = 
channel.getChannelHandlerContext();
+        RemotingCommand command = 
LocalRemotingCommand.createRequestCommand(RequestCode.BATCH_ACK_MESSAGE, null);
+
+        Map<String, BatchAck> batchAckMap = new HashMap<>();
+        for (ReceiptHandleMessage receiptHandleMessage : handleList) {
+            String extraInfo = 
receiptHandleMessage.getReceiptHandle().getReceiptHandle();
+            String[] extraInfoData = ExtraInfoUtil.split(extraInfo);
+            String mergeKey = ExtraInfoUtil.getRetry(extraInfoData) + "@" +
+                ExtraInfoUtil.getQueueId(extraInfoData) + "@" +
+                ExtraInfoUtil.getCkQueueOffset(extraInfoData) + "@" +
+                ExtraInfoUtil.getPopTime(extraInfoData);
+            BatchAck bAck = batchAckMap.computeIfAbsent(mergeKey, k -> {
+                BatchAck newBatchAck = new BatchAck();
+                newBatchAck.setConsumerGroup(consumerGroup);
+                newBatchAck.setTopic(topic);
+                newBatchAck.setRetry(ExtraInfoUtil.getRetry(extraInfoData));
+                
newBatchAck.setStartOffset(ExtraInfoUtil.getCkQueueOffset(extraInfoData));
+                
newBatchAck.setQueueId(ExtraInfoUtil.getQueueId(extraInfoData));
+                
newBatchAck.setReviveQueueId(ExtraInfoUtil.getReviveQid(extraInfoData));
+                
newBatchAck.setPopTime(ExtraInfoUtil.getPopTime(extraInfoData));
+                
newBatchAck.setInvisibleTime(ExtraInfoUtil.getInvisibleTime(extraInfoData));
+                newBatchAck.setBitSet(new BitSet());
+                return newBatchAck;
+            });
+            bAck.getBitSet().set((int) 
(ExtraInfoUtil.getQueueOffset(extraInfoData) - 
ExtraInfoUtil.getCkQueueOffset(extraInfoData)));
+        }
+        BatchAckMessageRequestBody requestBody = new 
BatchAckMessageRequestBody();
+        
requestBody.setBrokerName(brokerController.getBrokerConfig().getBrokerName());
+        requestBody.setAcks(new ArrayList<>(batchAckMap.values()));
+
+        command.setBody(requestBody.encode());
+        CompletableFuture<RemotingCommand> future = new CompletableFuture<>();
+        try {
+            RemotingCommand response = 
brokerController.getAckMessageProcessor()
+                .processRequest(channelHandlerContext, command);
+            future.complete(response);
+        } catch (Exception e) {
+            log.error("Fail to process batchAckMessage command", e);
+            future.completeExceptionally(e);
+        }
+        return future.thenApply(r -> {
+            AckResult ackResult = new AckResult();
+            if (ResponseCode.SUCCESS == r.getCode()) {
+                ackResult.setStatus(AckStatus.OK);
+            } else {
+                ackResult.setStatus(AckStatus.NO_EXIST);
+            }
+            return ackResult;
+        });
+    }
+
     @Override
     public CompletableFuture<PullResult> pullMessage(ProxyContext ctx, 
AddressableMessageQueue messageQueue,
         PullMessageRequestHeader requestHeader, long timeoutMillis) {
diff --git 
a/proxy/src/main/java/org/apache/rocketmq/proxy/service/message/MessageService.java
 
b/proxy/src/main/java/org/apache/rocketmq/proxy/service/message/MessageService.java
index 15da171540..58a835adb4 100644
--- 
a/proxy/src/main/java/org/apache/rocketmq/proxy/service/message/MessageService.java
+++ 
b/proxy/src/main/java/org/apache/rocketmq/proxy/service/message/MessageService.java
@@ -91,6 +91,14 @@ public interface MessageService {
         long timeoutMillis
     );
 
+    CompletableFuture<AckResult> batchAckMessage(
+        ProxyContext ctx,
+        List<ReceiptHandleMessage> handleList,
+        String consumerGroup,
+        String topic,
+        long timeoutMillis
+    );
+
     CompletableFuture<PullResult> pullMessage(
         ProxyContext ctx,
         AddressableMessageQueue messageQueue,
diff --git 
a/proxy/src/main/java/org/apache/rocketmq/proxy/service/message/ReceiptHandleMessage.java
 
b/proxy/src/main/java/org/apache/rocketmq/proxy/service/message/ReceiptHandleMessage.java
new file mode 100644
index 0000000000..ae63fed491
--- /dev/null
+++ 
b/proxy/src/main/java/org/apache/rocketmq/proxy/service/message/ReceiptHandleMessage.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.proxy.service.message;
+
+import org.apache.rocketmq.common.consumer.ReceiptHandle;
+
+public class ReceiptHandleMessage {
+
+    private final ReceiptHandle receiptHandle;
+    private final String messageId;
+
+    public ReceiptHandleMessage(ReceiptHandle receiptHandle, String messageId) 
{
+        this.receiptHandle = receiptHandle;
+        this.messageId = messageId;
+    }
+
+    public ReceiptHandle getReceiptHandle() {
+        return receiptHandle;
+    }
+
+    public String getMessageId() {
+        return messageId;
+    }
+}
diff --git 
a/proxy/src/test/java/org/apache/rocketmq/proxy/grpc/v2/consumer/AckMessageActivityTest.java
 
b/proxy/src/test/java/org/apache/rocketmq/proxy/grpc/v2/consumer/AckMessageActivityTest.java
index 49fdfc6a8b..3c47461051 100644
--- 
a/proxy/src/test/java/org/apache/rocketmq/proxy/grpc/v2/consumer/AckMessageActivityTest.java
+++ 
b/proxy/src/test/java/org/apache/rocketmq/proxy/grpc/v2/consumer/AckMessageActivityTest.java
@@ -20,21 +20,32 @@ package org.apache.rocketmq.proxy.grpc.v2.consumer;
 import apache.rocketmq.v2.AckMessageEntry;
 import apache.rocketmq.v2.AckMessageRequest;
 import apache.rocketmq.v2.AckMessageResponse;
+import apache.rocketmq.v2.AckMessageResultEntry;
 import apache.rocketmq.v2.Code;
 import apache.rocketmq.v2.Resource;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
 import java.util.concurrent.CompletableFuture;
 import org.apache.rocketmq.client.consumer.AckResult;
 import org.apache.rocketmq.client.consumer.AckStatus;
 import org.apache.rocketmq.proxy.common.ProxyException;
 import org.apache.rocketmq.proxy.common.ProxyExceptionCode;
+import org.apache.rocketmq.proxy.config.ConfigurationManager;
 import org.apache.rocketmq.proxy.grpc.v2.BaseActivityTest;
+import org.apache.rocketmq.proxy.processor.BatchAckResult;
+import org.apache.rocketmq.proxy.service.message.ReceiptHandleMessage;
 import org.junit.Before;
 import org.junit.Test;
+import org.mockito.stubbing.Answer;
 
 import static org.junit.Assert.assertEquals;
 import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyList;
 import static org.mockito.ArgumentMatchers.anyString;
 import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.Mockito.doAnswer;
 import static org.mockito.Mockito.when;
 
 public class AckMessageActivityTest extends BaseActivityTest {
@@ -52,43 +63,197 @@ public class AckMessageActivityTest extends 
BaseActivityTest {
 
     @Test
     public void testAckMessage() throws Throwable {
-        when(this.messagingProcessor.ackMessage(any(), any(), eq("msg1"), 
anyString(), anyString()))
+        ConfigurationManager.getProxyConfig().setEnableBatchAck(false);
+
+        String msg1 = "msg1";
+        String msg2 = "msg2";
+        String msg3 = "msg3";
+
+        when(this.messagingProcessor.ackMessage(any(), any(), eq(msg1), 
anyString(), anyString()))
             .thenThrow(new 
ProxyException(ProxyExceptionCode.INVALID_RECEIPT_HANDLE, "receipt handle is 
expired"));
 
         AckResult msg2AckResult = new AckResult();
         msg2AckResult.setStatus(AckStatus.OK);
-        when(this.messagingProcessor.ackMessage(any(), any(), eq("msg2"), 
anyString(), anyString()))
+        when(this.messagingProcessor.ackMessage(any(), any(), eq(msg2), 
anyString(), anyString()))
             .thenReturn(CompletableFuture.completedFuture(msg2AckResult));
 
         AckResult msg3AckResult = new AckResult();
         msg3AckResult.setStatus(AckStatus.NO_EXIST);
-        when(this.messagingProcessor.ackMessage(any(), any(), eq("msg3"), 
anyString(), anyString()))
+        when(this.messagingProcessor.ackMessage(any(), any(), eq(msg3), 
anyString(), anyString()))
             .thenReturn(CompletableFuture.completedFuture(msg3AckResult));
 
-        AckMessageResponse response = this.ackMessageActivity.ackMessage(
-            createContext(),
-            AckMessageRequest.newBuilder()
-                .setTopic(Resource.newBuilder().setName(TOPIC).build())
-                .setGroup(Resource.newBuilder().setName(GROUP).build())
-                .addEntries(AckMessageEntry.newBuilder()
-                    .setMessageId("msg1")
-                    .setReceiptHandle(buildReceiptHandle(TOPIC, 
System.currentTimeMillis() - 10000, 1000))
-                    .build())
-                .addEntries(AckMessageEntry.newBuilder()
-                    .setMessageId("msg2")
-                    .setReceiptHandle(buildReceiptHandle(TOPIC, 
System.currentTimeMillis(), 3000))
-                    .build())
-                .addEntries(AckMessageEntry.newBuilder()
-                    .setMessageId("msg3")
-                    .setReceiptHandle(buildReceiptHandle(TOPIC, 
System.currentTimeMillis(), 3000))
-                    .build())
-                .build()
-        ).get();
-
-        assertEquals(Code.MULTIPLE_RESULTS, response.getStatus().getCode());
-        assertEquals(3, response.getEntriesCount());
-        assertEquals(Code.INVALID_RECEIPT_HANDLE, 
response.getEntries(0).getStatus().getCode());
-        assertEquals(Code.OK, response.getEntries(1).getStatus().getCode());
-        assertEquals(Code.INTERNAL_SERVER_ERROR, 
response.getEntries(2).getStatus().getCode());
+        {
+            AckMessageResponse response = this.ackMessageActivity.ackMessage(
+                createContext(),
+                AckMessageRequest.newBuilder()
+                    .setTopic(Resource.newBuilder().setName(TOPIC).build())
+                    .setGroup(Resource.newBuilder().setName(GROUP).build())
+                    .addEntries(AckMessageEntry.newBuilder()
+                        .setMessageId(msg1)
+                        .setReceiptHandle(buildReceiptHandle(TOPIC, 
System.currentTimeMillis() - 10000, 1000))
+                        .build())
+                    .build()
+            ).get();
+            assertEquals(Code.INVALID_RECEIPT_HANDLE, 
response.getStatus().getCode());
+        }
+        {
+            AckMessageResponse response = this.ackMessageActivity.ackMessage(
+                createContext(),
+                AckMessageRequest.newBuilder()
+                    .setTopic(Resource.newBuilder().setName(TOPIC).build())
+                    .setGroup(Resource.newBuilder().setName(GROUP).build())
+                    .addEntries(AckMessageEntry.newBuilder()
+                        .setMessageId(msg2)
+                        .setReceiptHandle(buildReceiptHandle(TOPIC, 
System.currentTimeMillis() - 10000, 1000))
+                        .build())
+                    .build()
+            ).get();
+            assertEquals(Code.OK, response.getStatus().getCode());
+        }
+        {
+            AckMessageResponse response = this.ackMessageActivity.ackMessage(
+                createContext(),
+                AckMessageRequest.newBuilder()
+                    .setTopic(Resource.newBuilder().setName(TOPIC).build())
+                    .setGroup(Resource.newBuilder().setName(GROUP).build())
+                    .addEntries(AckMessageEntry.newBuilder()
+                        .setMessageId(msg3)
+                        .setReceiptHandle(buildReceiptHandle(TOPIC, 
System.currentTimeMillis() - 10000, 1000))
+                        .build())
+                    .build()
+            ).get();
+            assertEquals(Code.INTERNAL_SERVER_ERROR, 
response.getStatus().getCode());
+        }
+        {
+            AckMessageResponse response = this.ackMessageActivity.ackMessage(
+                createContext(),
+                AckMessageRequest.newBuilder()
+                    .setTopic(Resource.newBuilder().setName(TOPIC).build())
+                    .setGroup(Resource.newBuilder().setName(GROUP).build())
+                    .addEntries(AckMessageEntry.newBuilder()
+                        .setMessageId(msg1)
+                        .setReceiptHandle(buildReceiptHandle(TOPIC, 
System.currentTimeMillis() - 10000, 1000))
+                        .build())
+                    .addEntries(AckMessageEntry.newBuilder()
+                        .setMessageId(msg2)
+                        .setReceiptHandle(buildReceiptHandle(TOPIC, 
System.currentTimeMillis(), 3000))
+                        .build())
+                    .addEntries(AckMessageEntry.newBuilder()
+                        .setMessageId(msg3)
+                        .setReceiptHandle(buildReceiptHandle(TOPIC, 
System.currentTimeMillis(), 3000))
+                        .build())
+                    .build()
+            ).get();
+
+            assertEquals(Code.MULTIPLE_RESULTS, 
response.getStatus().getCode());
+            assertEquals(3, response.getEntriesCount());
+            assertEquals(Code.INVALID_RECEIPT_HANDLE, 
response.getEntries(0).getStatus().getCode());
+            assertEquals(Code.OK, 
response.getEntries(1).getStatus().getCode());
+            assertEquals(Code.INTERNAL_SERVER_ERROR, 
response.getEntries(2).getStatus().getCode());
+        }
+    }
+
+    @Test
+    public void testAckMessageInBatch() throws Throwable {
+        ConfigurationManager.getProxyConfig().setEnableBatchAck(true);
+
+        String successMessageId = "msg1";
+        String notOkMessageId = "msg2";
+        String exceptionMessageId = "msg3";
+
+        doAnswer((Answer<CompletableFuture<List<BatchAckResult>>>) invocation 
-> {
+            List<ReceiptHandleMessage> receiptHandleMessageList = 
invocation.getArgument(1, List.class);
+            List<BatchAckResult> batchAckResultList = new ArrayList<>();
+            for (ReceiptHandleMessage receiptHandleMessage : 
receiptHandleMessageList) {
+                BatchAckResult batchAckResult;
+                if 
(receiptHandleMessage.getMessageId().equals(successMessageId)) {
+                    AckResult ackResult = new AckResult();
+                    ackResult.setStatus(AckStatus.OK);
+                    batchAckResult = new BatchAckResult(receiptHandleMessage, 
ackResult);
+                } else if 
(receiptHandleMessage.getMessageId().equals(notOkMessageId)) {
+                    AckResult ackResult = new AckResult();
+                    ackResult.setStatus(AckStatus.NO_EXIST);
+                    batchAckResult = new BatchAckResult(receiptHandleMessage, 
ackResult);
+                } else {
+                    batchAckResult = new BatchAckResult(receiptHandleMessage, 
new ProxyException(ProxyExceptionCode.INVALID_RECEIPT_HANDLE, ""));
+                }
+                batchAckResultList.add(batchAckResult);
+            }
+            return CompletableFuture.completedFuture(batchAckResultList);
+        }).when(this.messagingProcessor).batchAckMessage(any(), anyList(), 
anyString(), anyString());
+
+        {
+            AckMessageResponse response = this.ackMessageActivity.ackMessage(
+                createContext(),
+                AckMessageRequest.newBuilder()
+                    .setTopic(Resource.newBuilder().setName(TOPIC).build())
+                    .setGroup(Resource.newBuilder().setName(GROUP).build())
+                    .addEntries(AckMessageEntry.newBuilder()
+                        .setMessageId(successMessageId)
+                        .setReceiptHandle(buildReceiptHandle(TOPIC, 
System.currentTimeMillis(), 3000))
+                        .build())
+                    .build()
+            ).get();
+            assertEquals(Code.OK, response.getStatus().getCode());
+        }
+        {
+            AckMessageResponse response = this.ackMessageActivity.ackMessage(
+                createContext(),
+                AckMessageRequest.newBuilder()
+                    .setTopic(Resource.newBuilder().setName(TOPIC).build())
+                    .setGroup(Resource.newBuilder().setName(GROUP).build())
+                    .addEntries(AckMessageEntry.newBuilder()
+                        .setMessageId(notOkMessageId)
+                        .setReceiptHandle(buildReceiptHandle(TOPIC, 
System.currentTimeMillis(), 3000))
+                        .build())
+                    .build()
+            ).get();
+            assertEquals(Code.INTERNAL_SERVER_ERROR, 
response.getStatus().getCode());
+        }
+        {
+            AckMessageResponse response = this.ackMessageActivity.ackMessage(
+                createContext(),
+                AckMessageRequest.newBuilder()
+                    .setTopic(Resource.newBuilder().setName(TOPIC).build())
+                    .setGroup(Resource.newBuilder().setName(GROUP).build())
+                    .addEntries(AckMessageEntry.newBuilder()
+                        .setMessageId(exceptionMessageId)
+                        .setReceiptHandle(buildReceiptHandle(TOPIC, 
System.currentTimeMillis(), 3000))
+                        .build())
+                    .build()
+            ).get();
+            assertEquals(Code.INVALID_RECEIPT_HANDLE, 
response.getStatus().getCode());
+        }
+        {
+            AckMessageResponse response = this.ackMessageActivity.ackMessage(
+                createContext(),
+                AckMessageRequest.newBuilder()
+                    .setTopic(Resource.newBuilder().setName(TOPIC).build())
+                    .setGroup(Resource.newBuilder().setName(GROUP).build())
+                    .addEntries(AckMessageEntry.newBuilder()
+                        .setMessageId(successMessageId)
+                        .setReceiptHandle(buildReceiptHandle(TOPIC, 
System.currentTimeMillis(), 3000))
+                        .build())
+                    .addEntries(AckMessageEntry.newBuilder()
+                        .setMessageId(notOkMessageId)
+                        .setReceiptHandle(buildReceiptHandle(TOPIC, 
System.currentTimeMillis(), 3000))
+                        .build())
+                    .addEntries(AckMessageEntry.newBuilder()
+                        .setMessageId(exceptionMessageId)
+                        .setReceiptHandle(buildReceiptHandle(TOPIC, 
System.currentTimeMillis(), 3000))
+                        .build())
+                    .build()
+            ).get();
+
+            assertEquals(Code.MULTIPLE_RESULTS, 
response.getStatus().getCode());
+            assertEquals(3, response.getEntriesCount());
+            Map<String, Code> msgCode = new HashMap<>();
+            for (AckMessageResultEntry entry : response.getEntriesList()) {
+                msgCode.put(entry.getMessageId(), entry.getStatus().getCode());
+            }
+            assertEquals(Code.OK, msgCode.get(successMessageId));
+            assertEquals(Code.INTERNAL_SERVER_ERROR, 
msgCode.get(notOkMessageId));
+            assertEquals(Code.INVALID_RECEIPT_HANDLE, 
msgCode.get(exceptionMessageId));
+        }
     }
 }
\ No newline at end of file
diff --git 
a/proxy/src/test/java/org/apache/rocketmq/proxy/processor/BaseProcessorTest.java
 
b/proxy/src/test/java/org/apache/rocketmq/proxy/processor/BaseProcessorTest.java
index 5c1ea9627e..072630e394 100644
--- 
a/proxy/src/test/java/org/apache/rocketmq/proxy/processor/BaseProcessorTest.java
+++ 
b/proxy/src/test/java/org/apache/rocketmq/proxy/processor/BaseProcessorTest.java
@@ -66,14 +66,6 @@ public class BaseProcessorTest extends InitConfigTest {
     protected ProxyRelayService proxyRelayService;
     @Mock
     protected MetadataService metadataService;
-    @Mock
-    protected ProducerProcessor producerProcessor;
-    @Mock
-    protected ConsumerProcessor consumerProcessor;
-    @Mock
-    protected TransactionProcessor transactionProcessor;
-    @Mock
-    protected ClientProcessor clientProcessor;
 
     public void before() throws Throwable {
         super.before();
@@ -92,6 +84,13 @@ public class BaseProcessorTest extends InitConfigTest {
     }
 
     protected static MessageExt createMessageExt(String topic, String tags, 
int reconsumeTimes, long invisibleTime) {
+        return createMessageExt(topic, tags, reconsumeTimes, invisibleTime, 
System.currentTimeMillis(),
+            RANDOM.nextInt(Integer.MAX_VALUE), 
RANDOM.nextInt(Integer.MAX_VALUE), RANDOM.nextInt(Integer.MAX_VALUE),
+            RANDOM.nextInt(Integer.MAX_VALUE), "mockBroker");
+    }
+
+    protected static MessageExt createMessageExt(String topic, String tags, 
int reconsumeTimes, long invisibleTime, long popTime,
+        long startOffset, int reviveQid, int queueId, long queueOffset, String 
brokerName) {
         MessageExt messageExt = new MessageExt();
         messageExt.setTopic(topic);
         messageExt.setTags(tags);
@@ -100,8 +99,7 @@ public class BaseProcessorTest extends InitConfigTest {
         messageExt.setMsgId(MessageClientIDSetter.createUniqID());
         messageExt.setCommitLogOffset(RANDOM.nextInt(Integer.MAX_VALUE));
         MessageAccessor.putProperty(messageExt, MessageConst.PROPERTY_POP_CK,
-            ExtraInfoUtil.buildExtraInfo(RANDOM.nextInt(Integer.MAX_VALUE), 
System.currentTimeMillis(), invisibleTime,
-                RANDOM.nextInt(Integer.MAX_VALUE), topic, "mockBroker", 
RANDOM.nextInt(Integer.MAX_VALUE), RANDOM.nextInt(Integer.MAX_VALUE)));
+            ExtraInfoUtil.buildExtraInfo(startOffset, popTime, invisibleTime, 
reviveQid, topic, brokerName, queueId, queueOffset));
         return messageExt;
     }
 
diff --git 
a/proxy/src/test/java/org/apache/rocketmq/proxy/processor/ConsumerProcessorTest.java
 
b/proxy/src/test/java/org/apache/rocketmq/proxy/processor/ConsumerProcessorTest.java
index 717e86fc05..db268a06e6 100644
--- 
a/proxy/src/test/java/org/apache/rocketmq/proxy/processor/ConsumerProcessorTest.java
+++ 
b/proxy/src/test/java/org/apache/rocketmq/proxy/processor/ConsumerProcessorTest.java
@@ -20,8 +20,11 @@ package org.apache.rocketmq.proxy.processor;
 import com.google.common.collect.Sets;
 import java.time.Duration;
 import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
+import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.Executors;
@@ -39,7 +42,10 @@ import 
org.apache.rocketmq.common.message.MessageClientIDSetter;
 import org.apache.rocketmq.common.message.MessageExt;
 import org.apache.rocketmq.common.message.MessageQueue;
 import org.apache.rocketmq.proxy.common.ProxyContext;
+import org.apache.rocketmq.proxy.common.ProxyExceptionCode;
+import org.apache.rocketmq.proxy.common.utils.FutureUtils;
 import org.apache.rocketmq.proxy.common.utils.ProxyUtils;
+import org.apache.rocketmq.proxy.service.message.ReceiptHandleMessage;
 import org.apache.rocketmq.proxy.service.route.AddressableMessageQueue;
 import org.apache.rocketmq.proxy.service.route.MessageQueueView;
 import org.apache.rocketmq.remoting.protocol.RemotingCommand;
@@ -50,16 +56,22 @@ import 
org.apache.rocketmq.remoting.protocol.header.PopMessageRequestHeader;
 import org.junit.Before;
 import org.junit.Test;
 import org.mockito.ArgumentCaptor;
+import org.mockito.stubbing.Answer;
 
 import static org.assertj.core.api.Assertions.assertThat;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertSame;
 import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyList;
 import static org.mockito.ArgumentMatchers.anyLong;
 import static org.mockito.ArgumentMatchers.anyString;
 import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.Mockito.doAnswer;
 import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
 
 public class ConsumerProcessorTest extends BaseProcessorTest {
@@ -162,6 +174,109 @@ public class ConsumerProcessorTest extends 
BaseProcessorTest {
         assertEquals(handle.getReceiptHandle(), 
requestHeaderArgumentCaptor.getValue().getExtraInfo());
     }
 
+    @Test
+    public void testBatchAckExpireMessage() throws Throwable {
+        String brokerName1 = "brokerName1";
+
+        List<ReceiptHandleMessage> receiptHandleMessageList = new 
ArrayList<>();
+        for (int i = 0; i < 3; i++) {
+            MessageExt expireMessage = createMessageExt(TOPIC, "", 0, 3000, 
System.currentTimeMillis() - 10000,
+                0, 0, 0, i, brokerName1);
+            ReceiptHandle expireHandle = create(expireMessage);
+            receiptHandleMessageList.add(new 
ReceiptHandleMessage(expireHandle, expireMessage.getMsgId()));
+        }
+
+        List<BatchAckResult> batchAckResultList = 
this.consumerProcessor.batchAckMessage(createContext(), 
receiptHandleMessageList, CONSUMER_GROUP, TOPIC, 3000).get();
+
+        verify(this.messageService, never()).batchAckMessage(any(), anyList(), 
anyString(), anyString(), anyLong());
+        assertEquals(receiptHandleMessageList.size(), 
batchAckResultList.size());
+        for (BatchAckResult batchAckResult : batchAckResultList) {
+            assertNull(batchAckResult.getAckResult());
+            assertNotNull(batchAckResult.getProxyException());
+            assertNotNull(batchAckResult.getReceiptHandleMessage());
+        }
+
+    }
+
+    @Test
+    public void testBatchAckMessage() throws Throwable {
+        String brokerName1 = "brokerName1";
+        String brokerName2 = "brokerName2";
+        String errThrowBrokerName = "errThrowBrokerName";
+        MessageExt expireMessage = createMessageExt(TOPIC, "", 0, 3000, 
System.currentTimeMillis() - 10000,
+            0, 0, 0, 0, brokerName1);
+        ReceiptHandle expireHandle = create(expireMessage);
+
+        List<ReceiptHandleMessage> receiptHandleMessageList = new 
ArrayList<>();
+        receiptHandleMessageList.add(new ReceiptHandleMessage(expireHandle, 
expireMessage.getMsgId()));
+        List<String> broker1Msg = new ArrayList<>();
+        List<String> broker2Msg = new ArrayList<>();
+
+        long now = System.currentTimeMillis();
+        int msgNum = 3;
+        for (int i = 0; i < msgNum; i++) {
+            MessageExt brokerMessage = createMessageExt(TOPIC, "", 0, 3000, 
now,
+                0, 0, 0, i + 1, brokerName1);
+            ReceiptHandle brokerHandle = create(brokerMessage);
+            receiptHandleMessageList.add(new 
ReceiptHandleMessage(brokerHandle, brokerMessage.getMsgId()));
+            broker1Msg.add(brokerMessage.getMsgId());
+        }
+        for (int i = 0; i < msgNum; i++) {
+            MessageExt brokerMessage = createMessageExt(TOPIC, "", 0, 3000, 
now,
+                0, 0, 0, i + 1, brokerName2);
+            ReceiptHandle brokerHandle = create(brokerMessage);
+            receiptHandleMessageList.add(new 
ReceiptHandleMessage(brokerHandle, brokerMessage.getMsgId()));
+            broker2Msg.add(brokerMessage.getMsgId());
+        }
+
+        // for this message, will throw exception in batchAckMessage
+        MessageExt errThrowMessage = createMessageExt(TOPIC, "", 0, 3000, now,
+            0, 0, 0, 0, errThrowBrokerName);
+        ReceiptHandle errThrowHandle = create(errThrowMessage);
+        receiptHandleMessageList.add(new ReceiptHandleMessage(errThrowHandle, 
errThrowMessage.getMsgId()));
+
+        Collections.shuffle(receiptHandleMessageList);
+
+        doAnswer((Answer<CompletableFuture<AckResult>>) invocation -> {
+            List<ReceiptHandleMessage> handleMessageList = 
invocation.getArgument(1, List.class);
+            AckResult ackResult = new AckResult();
+            String brokerName = 
handleMessageList.get(0).getReceiptHandle().getBrokerName();
+            if (brokerName.equals(brokerName1)) {
+                ackResult.setStatus(AckStatus.OK);
+            } else if (brokerName.equals(brokerName2)) {
+                ackResult.setStatus(AckStatus.NO_EXIST);
+            } else {
+                return FutureUtils.completeExceptionally(new 
RuntimeException());
+            }
+
+            return CompletableFuture.completedFuture(ackResult);
+        }).when(this.messageService).batchAckMessage(any(), anyList(), 
anyString(), anyString(), anyLong());
+
+        List<BatchAckResult> batchAckResultList = 
this.consumerProcessor.batchAckMessage(createContext(), 
receiptHandleMessageList, CONSUMER_GROUP, TOPIC, 3000).get();
+        assertEquals(receiptHandleMessageList.size(), 
batchAckResultList.size());
+
+        // check ackResult for each msg
+        Map<String, BatchAckResult> msgBatchAckResult = new HashMap<>();
+        for (BatchAckResult batchAckResult : batchAckResultList) {
+            
msgBatchAckResult.put(batchAckResult.getReceiptHandleMessage().getMessageId(), 
batchAckResult);
+        }
+        for (String msgId : broker1Msg) {
+            assertEquals(AckStatus.OK, 
msgBatchAckResult.get(msgId).getAckResult().getStatus());
+            assertNull(msgBatchAckResult.get(msgId).getProxyException());
+        }
+        for (String msgId : broker2Msg) {
+            assertEquals(AckStatus.NO_EXIST, 
msgBatchAckResult.get(msgId).getAckResult().getStatus());
+            assertNull(msgBatchAckResult.get(msgId).getProxyException());
+        }
+        
assertNotNull(msgBatchAckResult.get(expireMessage.getMsgId()).getProxyException());
+        assertEquals(ProxyExceptionCode.INVALID_RECEIPT_HANDLE, 
msgBatchAckResult.get(expireMessage.getMsgId()).getProxyException().getCode());
+        
assertNull(msgBatchAckResult.get(expireMessage.getMsgId()).getAckResult());
+
+        
assertNotNull(msgBatchAckResult.get(errThrowMessage.getMsgId()).getProxyException());
+        assertEquals(ProxyExceptionCode.INTERNAL_SERVER_ERROR, 
msgBatchAckResult.get(errThrowMessage.getMsgId()).getProxyException().getCode());
+        
assertNull(msgBatchAckResult.get(errThrowMessage.getMsgId()).getAckResult());
+    }
+
     @Test
     public void testChangeInvisibleTime() throws Throwable {
         ReceiptHandle handle = 
create(createMessageExt(MixAll.RETRY_GROUP_TOPIC_PREFIX + TOPIC, "", 0, 3000));
diff --git 
a/proxy/src/test/java/org/apache/rocketmq/proxy/service/mqclient/MQClientAPIExtTest.java
 
b/proxy/src/test/java/org/apache/rocketmq/proxy/service/mqclient/MQClientAPIExtTest.java
index 77a119a296..3f3a4ae40c 100644
--- 
a/proxy/src/test/java/org/apache/rocketmq/proxy/service/mqclient/MQClientAPIExtTest.java
+++ 
b/proxy/src/test/java/org/apache/rocketmq/proxy/service/mqclient/MQClientAPIExtTest.java
@@ -220,6 +220,18 @@ public class MQClientAPIExtTest {
         assertSame(ackResult, mqClientAPI.ackMessageAsync(BROKER_ADDR, new 
AckMessageRequestHeader(), TIMEOUT).get());
     }
 
+    @Test
+    public void testBatchAckMessageAsync() throws Exception {
+        AckResult ackResult = new AckResult();
+        doAnswer((Answer<Void>) mock -> {
+            AckCallback ackCallback = mock.getArgument(2);
+            ackCallback.onSuccess(ackResult);
+            return null;
+        }).when(mqClientAPI).batchAckMessageAsync(anyString(), anyLong(), 
any(AckCallback.class), any());
+
+        assertSame(ackResult, mqClientAPI.batchAckMessageAsync(BROKER_ADDR, 
TOPIC, CONSUMER_GROUP, new ArrayList<>(), TIMEOUT).get());
+    }
+
     @Test
     public void testChangeInvisibleTimeAsync() throws Exception {
         AckResult ackResult = new AckResult();

Reply via email to