This is an automated email from the ASF dual-hosted git repository.
kaili pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git
The following commit(s) were added to refs/heads/develop by this push:
new 77e8e54b37 [ISSUE #7223] Support batch ack for grpc client in proxy
(#7225)
77e8e54b37 is described below
commit 77e8e54b37c3fc3ea0beffc1ace6f5bf20af10d9
Author: lk <[email protected]>
AuthorDate: Wed Aug 23 15:56:39 2023 +0800
[ISSUE #7223] Support batch ack for grpc client in proxy (#7225)
---
.../client/impl/mqclient/MQClientAPIExt.java | 26 +++
.../apache/rocketmq/proxy/config/ProxyConfig.java | 10 +
.../proxy/grpc/v2/consumer/AckMessageActivity.java | 136 +++++++++----
.../proxy/processor/AbstractProcessor.java | 4 +-
.../rocketmq/proxy/processor/BatchAckResult.java | 53 +++++
.../proxy/processor/ConsumerProcessor.java | 64 ++++++
.../proxy/processor/DefaultMessagingProcessor.java | 7 +
.../proxy/processor/MessagingProcessor.java | 18 ++
.../service/message/ClusterMessageService.java | 16 +-
.../proxy/service/message/LocalMessageService.java | 58 ++++++
.../proxy/service/message/MessageService.java | 8 +
.../service/message/ReceiptHandleMessage.java | 39 ++++
.../grpc/v2/consumer/AckMessageActivityTest.java | 221 ++++++++++++++++++---
.../proxy/processor/BaseProcessorTest.java | 18 +-
.../proxy/processor/ConsumerProcessorTest.java | 115 +++++++++++
.../proxy/service/mqclient/MQClientAPIExtTest.java | 12 ++
16 files changed, 728 insertions(+), 77 deletions(-)
diff --git
a/client/src/main/java/org/apache/rocketmq/client/impl/mqclient/MQClientAPIExt.java
b/client/src/main/java/org/apache/rocketmq/client/impl/mqclient/MQClientAPIExt.java
index fb8f8d11fd..d7c8ef8d92 100644
---
a/client/src/main/java/org/apache/rocketmq/client/impl/mqclient/MQClientAPIExt.java
+++
b/client/src/main/java/org/apache/rocketmq/client/impl/mqclient/MQClientAPIExt.java
@@ -306,6 +306,32 @@ public class MQClientAPIExt extends MQClientAPIImpl {
return future;
}
+ public CompletableFuture<AckResult> batchAckMessageAsync(
+ String brokerAddr,
+ String topic,
+ String consumerGroup,
+ List<String> extraInfoList,
+ long timeoutMillis
+ ) {
+ CompletableFuture<AckResult> future = new CompletableFuture<>();
+ try {
+ this.batchAckMessageAsync(brokerAddr, timeoutMillis, new
AckCallback() {
+ @Override
+ public void onSuccess(AckResult ackResult) {
+ future.complete(ackResult);
+ }
+
+ @Override
+ public void onException(Throwable t) {
+ future.completeExceptionally(t);
+ }
+ }, topic, consumerGroup, extraInfoList);
+ } catch (Throwable t) {
+ future.completeExceptionally(t);
+ }
+ return future;
+ }
+
public CompletableFuture<AckResult> changeInvisibleTimeAsync(
String brokerAddr,
String brokerName,
diff --git
a/proxy/src/main/java/org/apache/rocketmq/proxy/config/ProxyConfig.java
b/proxy/src/main/java/org/apache/rocketmq/proxy/config/ProxyConfig.java
index 39caaa0d91..76a2439196 100644
--- a/proxy/src/main/java/org/apache/rocketmq/proxy/config/ProxyConfig.java
+++ b/proxy/src/main/java/org/apache/rocketmq/proxy/config/ProxyConfig.java
@@ -250,6 +250,8 @@ public class ProxyConfig implements ConfigFile {
private long remotingWaitTimeMillsInTopicRouteQueue = 3 * 1000;
private long remotingWaitTimeMillsInDefaultQueue = 3 * 1000;
+ private boolean enableBatchAck = false;
+
@Override
public void initData() {
parseDelayLevel();
@@ -1379,4 +1381,12 @@ public class ProxyConfig implements ConfigFile {
public void setRemotingWaitTimeMillsInDefaultQueue(long
remotingWaitTimeMillsInDefaultQueue) {
this.remotingWaitTimeMillsInDefaultQueue =
remotingWaitTimeMillsInDefaultQueue;
}
+
+ public boolean isEnableBatchAck() {
+ return enableBatchAck;
+ }
+
+ public void setEnableBatchAck(boolean enableBatchAck) {
+ this.enableBatchAck = enableBatchAck;
+ }
}
diff --git
a/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/consumer/AckMessageActivity.java
b/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/consumer/AckMessageActivity.java
index 9a3a772017..97c716c8ff 100644
---
a/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/consumer/AckMessageActivity.java
+++
b/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/consumer/AckMessageActivity.java
@@ -31,12 +31,15 @@ import org.apache.rocketmq.client.consumer.AckStatus;
import org.apache.rocketmq.common.consumer.ReceiptHandle;
import org.apache.rocketmq.proxy.common.MessageReceiptHandle;
import org.apache.rocketmq.proxy.common.ProxyContext;
+import org.apache.rocketmq.proxy.config.ConfigurationManager;
import org.apache.rocketmq.proxy.grpc.v2.AbstractMessingActivity;
import org.apache.rocketmq.proxy.grpc.v2.channel.GrpcChannelManager;
import org.apache.rocketmq.proxy.grpc.v2.common.GrpcClientSettingsManager;
import org.apache.rocketmq.proxy.grpc.v2.common.GrpcConverter;
import org.apache.rocketmq.proxy.grpc.v2.common.ResponseBuilder;
+import org.apache.rocketmq.proxy.processor.BatchAckResult;
import org.apache.rocketmq.proxy.processor.MessagingProcessor;
+import org.apache.rocketmq.proxy.service.message.ReceiptHandleMessage;
public class AckMessageActivity extends AbstractMessingActivity {
@@ -50,60 +53,98 @@ public class AckMessageActivity extends
AbstractMessingActivity {
try {
validateTopicAndConsumerGroup(request.getTopic(),
request.getGroup());
-
- CompletableFuture<AckMessageResultEntry>[] futures = new
CompletableFuture[request.getEntriesCount()];
- for (int i = 0; i < request.getEntriesCount(); i++) {
- futures[i] = processAckMessage(ctx, request,
request.getEntries(i));
+ String group =
GrpcConverter.getInstance().wrapResourceWithNamespace(request.getGroup());
+ String topic =
GrpcConverter.getInstance().wrapResourceWithNamespace(request.getTopic());
+ if (ConfigurationManager.getProxyConfig().isEnableBatchAck()) {
+ future = ackMessageInBatch(ctx, group, topic, request);
+ } else {
+ future = ackMessageOneByOne(ctx, group, topic, request);
}
- CompletableFuture.allOf(futures).whenComplete((val, throwable) -> {
- if (throwable != null) {
- future.completeExceptionally(throwable);
- return;
- }
+ } catch (Throwable t) {
+ future.completeExceptionally(t);
+ }
+ return future;
+ }
+
+ protected CompletableFuture<AckMessageResponse>
ackMessageInBatch(ProxyContext ctx, String group, String topic,
AckMessageRequest request) {
+ List<ReceiptHandleMessage> handleMessageList = new
ArrayList<>(request.getEntriesCount());
+ for (AckMessageEntry ackMessageEntry : request.getEntriesList()) {
+ String handleString = getHandleString(ctx, group, request,
ackMessageEntry);
+ handleMessageList.add(new
ReceiptHandleMessage(ReceiptHandle.decode(handleString),
ackMessageEntry.getMessageId()));
+ }
+ return this.messagingProcessor.batchAckMessage(ctx, handleMessageList,
group, topic)
+ .thenApply(batchAckResultList -> {
+ AckMessageResponse.Builder responseBuilder =
AckMessageResponse.newBuilder();
Set<Code> responseCodes = new HashSet<>();
- List<AckMessageResultEntry> entryList = new ArrayList<>();
- for (CompletableFuture<AckMessageResultEntry> entryFuture :
futures) {
- AckMessageResultEntry entryResult = entryFuture.join();
- responseCodes.add(entryResult.getStatus().getCode());
- entryList.add(entryResult);
- }
- AckMessageResponse.Builder responseBuilder =
AckMessageResponse.newBuilder()
- .addAllEntries(entryList);
- if (responseCodes.size() > 1) {
-
responseBuilder.setStatus(ResponseBuilder.getInstance().buildStatus(Code.MULTIPLE_RESULTS,
Code.MULTIPLE_RESULTS.name()));
- } else if (responseCodes.size() == 1) {
- Code code = responseCodes.stream().findAny().get();
-
responseBuilder.setStatus(ResponseBuilder.getInstance().buildStatus(code,
code.name()));
- } else {
-
responseBuilder.setStatus(ResponseBuilder.getInstance().buildStatus(Code.INTERNAL_SERVER_ERROR,
"ack message result is empty"));
+ for (BatchAckResult batchAckResult : batchAckResultList) {
+ AckMessageResultEntry entry =
convertToAckMessageResultEntry(batchAckResult);
+ responseBuilder.addEntries(entry);
+ responseCodes.add(entry.getStatus().getCode());
}
- future.complete(responseBuilder.build());
+ setAckResponseStatus(responseBuilder, responseCodes);
+ return responseBuilder.build();
});
- } catch (Throwable t) {
- future.completeExceptionally(t);
+ }
+
+ protected AckMessageResultEntry
convertToAckMessageResultEntry(BatchAckResult batchAckResult) {
+ ReceiptHandleMessage handleMessage =
batchAckResult.getReceiptHandleMessage();
+ AckMessageResultEntry.Builder resultBuilder =
AckMessageResultEntry.newBuilder()
+ .setMessageId(handleMessage.getMessageId())
+
.setReceiptHandle(handleMessage.getReceiptHandle().getReceiptHandle());
+ if (batchAckResult.getProxyException() != null) {
+
resultBuilder.setStatus(ResponseBuilder.getInstance().buildStatus(batchAckResult.getProxyException()));
+ } else {
+ AckResult ackResult = batchAckResult.getAckResult();
+ if (AckStatus.OK.equals(ackResult.getStatus())) {
+
resultBuilder.setStatus(ResponseBuilder.getInstance().buildStatus(Code.OK,
Code.OK.name()));
+ } else {
+
resultBuilder.setStatus(ResponseBuilder.getInstance().buildStatus(Code.INTERNAL_SERVER_ERROR,
"ack failed: status is abnormal"));
+ }
}
- return future;
+ return resultBuilder.build();
}
- protected CompletableFuture<AckMessageResultEntry>
processAckMessage(ProxyContext ctx, AckMessageRequest request,
+ protected CompletableFuture<AckMessageResponse>
ackMessageOneByOne(ProxyContext ctx, String group, String topic,
AckMessageRequest request) {
+ CompletableFuture<AckMessageResponse> resultFuture = new
CompletableFuture<>();
+ CompletableFuture<AckMessageResultEntry>[] futures = new
CompletableFuture[request.getEntriesCount()];
+ for (int i = 0; i < request.getEntriesCount(); i++) {
+ futures[i] = processAckMessage(ctx, group, topic, request,
request.getEntries(i));
+ }
+ CompletableFuture.allOf(futures).whenComplete((val, throwable) -> {
+ if (throwable != null) {
+ resultFuture.completeExceptionally(throwable);
+ return;
+ }
+
+ Set<Code> responseCodes = new HashSet<>();
+ List<AckMessageResultEntry> entryList = new ArrayList<>();
+ for (CompletableFuture<AckMessageResultEntry> entryFuture :
futures) {
+ AckMessageResultEntry entryResult = entryFuture.join();
+ responseCodes.add(entryResult.getStatus().getCode());
+ entryList.add(entryResult);
+ }
+ AckMessageResponse.Builder responseBuilder =
AckMessageResponse.newBuilder()
+ .addAllEntries(entryList);
+ setAckResponseStatus(responseBuilder, responseCodes);
+ resultFuture.complete(responseBuilder.build());
+ });
+ return resultFuture;
+ }
+
+ protected CompletableFuture<AckMessageResultEntry>
processAckMessage(ProxyContext ctx, String group, String topic,
AckMessageRequest request,
AckMessageEntry ackMessageEntry) {
CompletableFuture<AckMessageResultEntry> future = new
CompletableFuture<>();
try {
- String handleString = ackMessageEntry.getReceiptHandle();
-
- String group =
GrpcConverter.getInstance().wrapResourceWithNamespace(request.getGroup());
- MessageReceiptHandle messageReceiptHandle =
messagingProcessor.removeReceiptHandle(ctx,
grpcChannelManager.getChannel(ctx.getClientID()), group,
ackMessageEntry.getMessageId(), ackMessageEntry.getReceiptHandle());
- if (messageReceiptHandle != null) {
- handleString = messageReceiptHandle.getReceiptHandleStr();
- }
+ String handleString = this.getHandleString(ctx, group, request,
ackMessageEntry);
CompletableFuture<AckResult> ackResultFuture =
this.messagingProcessor.ackMessage(
ctx,
ReceiptHandle.decode(handleString),
ackMessageEntry.getMessageId(),
group,
-
GrpcConverter.getInstance().wrapResourceWithNamespace(request.getTopic()));
+ topic
+ );
ackResultFuture.thenAccept(result -> {
future.complete(convertToAckMessageResultEntry(ctx,
ackMessageEntry, result));
}).exceptionally(t -> {
@@ -139,4 +180,25 @@ public class AckMessageActivity extends
AbstractMessingActivity {
.setStatus(ResponseBuilder.getInstance().buildStatus(Code.INTERNAL_SERVER_ERROR,
"ack failed: status is abnormal"))
.build();
}
+
+ protected void setAckResponseStatus(AckMessageResponse.Builder
responseBuilder, Set<Code> responseCodes) {
+ if (responseCodes.size() > 1) {
+
responseBuilder.setStatus(ResponseBuilder.getInstance().buildStatus(Code.MULTIPLE_RESULTS,
Code.MULTIPLE_RESULTS.name()));
+ } else if (responseCodes.size() == 1) {
+ Code code = responseCodes.stream().findAny().get();
+
responseBuilder.setStatus(ResponseBuilder.getInstance().buildStatus(code,
code.name()));
+ } else {
+
responseBuilder.setStatus(ResponseBuilder.getInstance().buildStatus(Code.INTERNAL_SERVER_ERROR,
"ack message result is empty"));
+ }
+ }
+
+ protected String getHandleString(ProxyContext ctx, String group,
AckMessageRequest request, AckMessageEntry ackMessageEntry) {
+ String handleString = ackMessageEntry.getReceiptHandle();
+
+ MessageReceiptHandle messageReceiptHandle =
messagingProcessor.removeReceiptHandle(ctx,
grpcChannelManager.getChannel(ctx.getClientID()), group,
ackMessageEntry.getMessageId(), ackMessageEntry.getReceiptHandle());
+ if (messageReceiptHandle != null) {
+ handleString = messageReceiptHandle.getReceiptHandleStr();
+ }
+ return handleString;
+ }
}
diff --git
a/proxy/src/main/java/org/apache/rocketmq/proxy/processor/AbstractProcessor.java
b/proxy/src/main/java/org/apache/rocketmq/proxy/processor/AbstractProcessor.java
index b61c3df9e5..c63212c231 100644
---
a/proxy/src/main/java/org/apache/rocketmq/proxy/processor/AbstractProcessor.java
+++
b/proxy/src/main/java/org/apache/rocketmq/proxy/processor/AbstractProcessor.java
@@ -27,6 +27,8 @@ public abstract class AbstractProcessor extends
AbstractStartAndShutdown {
protected MessagingProcessor messagingProcessor;
protected ServiceManager serviceManager;
+ protected static final ProxyException EXPIRED_HANDLE_PROXY_EXCEPTION = new
ProxyException(ProxyExceptionCode.INVALID_RECEIPT_HANDLE, "receipt handle is
expired");
+
public AbstractProcessor(MessagingProcessor messagingProcessor,
ServiceManager serviceManager) {
this.messagingProcessor = messagingProcessor;
@@ -35,7 +37,7 @@ public abstract class AbstractProcessor extends
AbstractStartAndShutdown {
protected void validateReceiptHandle(ReceiptHandle handle) {
if (handle.isExpired()) {
- throw new
ProxyException(ProxyExceptionCode.INVALID_RECEIPT_HANDLE, "receipt handle is
expired");
+ throw EXPIRED_HANDLE_PROXY_EXCEPTION;
}
}
}
diff --git
a/proxy/src/main/java/org/apache/rocketmq/proxy/processor/BatchAckResult.java
b/proxy/src/main/java/org/apache/rocketmq/proxy/processor/BatchAckResult.java
new file mode 100644
index 0000000000..dfb9c9b9e0
--- /dev/null
+++
b/proxy/src/main/java/org/apache/rocketmq/proxy/processor/BatchAckResult.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.proxy.processor;
+
+import org.apache.rocketmq.client.consumer.AckResult;
+import org.apache.rocketmq.proxy.common.ProxyException;
+import org.apache.rocketmq.proxy.service.message.ReceiptHandleMessage;
+
+public class BatchAckResult {
+
+ private final ReceiptHandleMessage receiptHandleMessage;
+ private AckResult ackResult;
+ private ProxyException proxyException;
+
+ public BatchAckResult(ReceiptHandleMessage receiptHandleMessage,
+ AckResult ackResult) {
+ this.receiptHandleMessage = receiptHandleMessage;
+ this.ackResult = ackResult;
+ }
+
+ public BatchAckResult(ReceiptHandleMessage receiptHandleMessage,
+ ProxyException proxyException) {
+ this.receiptHandleMessage = receiptHandleMessage;
+ this.proxyException = proxyException;
+ }
+
+ public ReceiptHandleMessage getReceiptHandleMessage() {
+ return receiptHandleMessage;
+ }
+
+ public AckResult getAckResult() {
+ return ackResult;
+ }
+
+ public ProxyException getProxyException() {
+ return proxyException;
+ }
+}
diff --git
a/proxy/src/main/java/org/apache/rocketmq/proxy/processor/ConsumerProcessor.java
b/proxy/src/main/java/org/apache/rocketmq/proxy/processor/ConsumerProcessor.java
index 656a6339dc..f3522b3740 100644
---
a/proxy/src/main/java/org/apache/rocketmq/proxy/processor/ConsumerProcessor.java
+++
b/proxy/src/main/java/org/apache/rocketmq/proxy/processor/ConsumerProcessor.java
@@ -48,6 +48,7 @@ import org.apache.rocketmq.proxy.common.ProxyExceptionCode;
import org.apache.rocketmq.proxy.common.utils.FutureUtils;
import org.apache.rocketmq.proxy.common.utils.ProxyUtils;
import org.apache.rocketmq.proxy.service.ServiceManager;
+import org.apache.rocketmq.proxy.service.message.ReceiptHandleMessage;
import org.apache.rocketmq.proxy.service.route.AddressableMessageQueue;
import org.apache.rocketmq.remoting.protocol.body.LockBatchRequestBody;
import org.apache.rocketmq.remoting.protocol.body.UnlockBatchRequestBody;
@@ -241,6 +242,69 @@ public class ConsumerProcessor extends AbstractProcessor {
return FutureUtils.addExecutor(future, this.executor);
}
+ public CompletableFuture<List<BatchAckResult>> batchAckMessage(
+ ProxyContext ctx,
+ List<ReceiptHandleMessage> handleMessageList,
+ String consumerGroup,
+ String topic,
+ long timeoutMillis
+ ) {
+ CompletableFuture<List<BatchAckResult>> future = new
CompletableFuture<>();
+ try {
+ List<BatchAckResult> batchAckResultList = new
ArrayList<>(handleMessageList.size());
+ Map<String, List<ReceiptHandleMessage>> brokerHandleListMap = new
HashMap<>();
+
+ for (ReceiptHandleMessage handleMessage : handleMessageList) {
+ if (handleMessage.getReceiptHandle().isExpired()) {
+ batchAckResultList.add(new BatchAckResult(handleMessage,
EXPIRED_HANDLE_PROXY_EXCEPTION));
+ continue;
+ }
+ List<ReceiptHandleMessage> brokerHandleList =
brokerHandleListMap.computeIfAbsent(handleMessage.getReceiptHandle().getBrokerName(),
key -> new ArrayList<>());
+ brokerHandleList.add(handleMessage);
+ }
+
+ if (brokerHandleListMap.isEmpty()) {
+ return
FutureUtils.addExecutor(CompletableFuture.completedFuture(batchAckResultList),
this.executor);
+ }
+ Set<Map.Entry<String, List<ReceiptHandleMessage>>>
brokerHandleListMapEntrySet = brokerHandleListMap.entrySet();
+ CompletableFuture<List<BatchAckResult>>[] futures = new
CompletableFuture[brokerHandleListMapEntrySet.size()];
+ int futureIndex = 0;
+ for (Map.Entry<String, List<ReceiptHandleMessage>> entry :
brokerHandleListMapEntrySet) {
+ futures[futureIndex++] = processBrokerHandle(ctx,
consumerGroup, topic, entry.getValue(), timeoutMillis);
+ }
+ CompletableFuture.allOf(futures).whenComplete((val, throwable) -> {
+ if (throwable != null) {
+ future.completeExceptionally(throwable);
+ }
+ for (CompletableFuture<List<BatchAckResult>> resultFuture :
futures) {
+ batchAckResultList.addAll(resultFuture.join());
+ }
+ future.complete(batchAckResultList);
+ });
+ } catch (Throwable t) {
+ future.completeExceptionally(t);
+ }
+ return FutureUtils.addExecutor(future, this.executor);
+ }
+
+ protected CompletableFuture<List<BatchAckResult>>
processBrokerHandle(ProxyContext ctx, String consumerGroup, String topic,
List<ReceiptHandleMessage> handleMessageList, long timeoutMillis) {
+ return this.serviceManager.getMessageService().batchAckMessage(ctx,
handleMessageList, consumerGroup, topic, timeoutMillis)
+ .thenApply(result -> {
+ List<BatchAckResult> results = new ArrayList<>();
+ for (ReceiptHandleMessage handleMessage : handleMessageList) {
+ results.add(new BatchAckResult(handleMessage, result));
+ }
+ return results;
+ })
+ .exceptionally(throwable -> {
+ List<BatchAckResult> results = new ArrayList<>();
+ for (ReceiptHandleMessage handleMessage : handleMessageList) {
+ results.add(new BatchAckResult(handleMessage, new
ProxyException(ProxyExceptionCode.INTERNAL_SERVER_ERROR,
throwable.getMessage(), throwable)));
+ }
+ return results;
+ });
+ }
+
public CompletableFuture<AckResult> changeInvisibleTime(ProxyContext ctx,
ReceiptHandle handle,
String messageId, String groupName, String topicName, long
invisibleTime, long timeoutMillis) {
CompletableFuture<AckResult> future = new CompletableFuture<>();
diff --git
a/proxy/src/main/java/org/apache/rocketmq/proxy/processor/DefaultMessagingProcessor.java
b/proxy/src/main/java/org/apache/rocketmq/proxy/processor/DefaultMessagingProcessor.java
index 188cb7b9bd..ba150051bc 100644
---
a/proxy/src/main/java/org/apache/rocketmq/proxy/processor/DefaultMessagingProcessor.java
+++
b/proxy/src/main/java/org/apache/rocketmq/proxy/processor/DefaultMessagingProcessor.java
@@ -46,6 +46,7 @@ import org.apache.rocketmq.proxy.config.ConfigurationManager;
import org.apache.rocketmq.proxy.config.ProxyConfig;
import org.apache.rocketmq.proxy.service.ServiceManager;
import org.apache.rocketmq.proxy.service.ServiceManagerFactory;
+import org.apache.rocketmq.proxy.service.message.ReceiptHandleMessage;
import org.apache.rocketmq.proxy.service.metadata.MetadataService;
import org.apache.rocketmq.proxy.service.relay.ProxyRelayService;
import org.apache.rocketmq.proxy.service.route.ProxyTopicRouteData;
@@ -183,6 +184,12 @@ public class DefaultMessagingProcessor extends
AbstractStartAndShutdown implemen
return this.consumerProcessor.ackMessage(ctx, handle, messageId,
consumerGroup, topic, timeoutMillis);
}
+ @Override
+ public CompletableFuture<List<BatchAckResult>>
batchAckMessage(ProxyContext ctx,
+ List<ReceiptHandleMessage> handleMessageList, String consumerGroup,
String topic, long timeoutMillis) {
+ return this.consumerProcessor.batchAckMessage(ctx, handleMessageList,
consumerGroup, topic, timeoutMillis);
+ }
+
@Override
public CompletableFuture<AckResult> changeInvisibleTime(ProxyContext ctx,
ReceiptHandle handle, String messageId,
String groupName, String topicName, long invisibleTime, long
timeoutMillis) {
diff --git
a/proxy/src/main/java/org/apache/rocketmq/proxy/processor/MessagingProcessor.java
b/proxy/src/main/java/org/apache/rocketmq/proxy/processor/MessagingProcessor.java
index d86be0bd88..2ae7418ba7 100644
---
a/proxy/src/main/java/org/apache/rocketmq/proxy/processor/MessagingProcessor.java
+++
b/proxy/src/main/java/org/apache/rocketmq/proxy/processor/MessagingProcessor.java
@@ -37,6 +37,7 @@ import org.apache.rocketmq.proxy.common.Address;
import org.apache.rocketmq.proxy.common.MessageReceiptHandle;
import org.apache.rocketmq.proxy.common.ProxyContext;
import org.apache.rocketmq.common.utils.StartAndShutdown;
+import org.apache.rocketmq.proxy.service.message.ReceiptHandleMessage;
import org.apache.rocketmq.proxy.service.metadata.MetadataService;
import org.apache.rocketmq.proxy.service.relay.ProxyRelayService;
import org.apache.rocketmq.proxy.service.route.ProxyTopicRouteData;
@@ -155,6 +156,23 @@ public interface MessagingProcessor extends
StartAndShutdown {
long timeoutMillis
);
+ default CompletableFuture<List<BatchAckResult>> batchAckMessage(
+ ProxyContext ctx,
+ List<ReceiptHandleMessage> handleMessageList,
+ String consumerGroup,
+ String topic
+ ) {
+ return batchAckMessage(ctx, handleMessageList, consumerGroup, topic,
DEFAULT_TIMEOUT_MILLS);
+ }
+
+ CompletableFuture<List<BatchAckResult>> batchAckMessage(
+ ProxyContext ctx,
+ List<ReceiptHandleMessage> handleMessageList,
+ String consumerGroup,
+ String topic,
+ long timeoutMillis
+ );
+
default CompletableFuture<AckResult> changeInvisibleTime(
ProxyContext ctx,
ReceiptHandle handle,
diff --git
a/proxy/src/main/java/org/apache/rocketmq/proxy/service/message/ClusterMessageService.java
b/proxy/src/main/java/org/apache/rocketmq/proxy/service/message/ClusterMessageService.java
index 9f163f1b98..70b72deae1 100644
---
a/proxy/src/main/java/org/apache/rocketmq/proxy/service/message/ClusterMessageService.java
+++
b/proxy/src/main/java/org/apache/rocketmq/proxy/service/message/ClusterMessageService.java
@@ -20,9 +20,11 @@ import com.google.common.collect.Lists;
import java.util.List;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
+import java.util.stream.Collectors;
import org.apache.rocketmq.client.consumer.AckResult;
import org.apache.rocketmq.client.consumer.PopResult;
import org.apache.rocketmq.client.consumer.PullResult;
+import org.apache.rocketmq.client.impl.mqclient.MQClientAPIFactory;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.consumer.ReceiptHandle;
import org.apache.rocketmq.common.message.Message;
@@ -31,7 +33,6 @@ import org.apache.rocketmq.proxy.common.ProxyContext;
import org.apache.rocketmq.proxy.common.ProxyException;
import org.apache.rocketmq.proxy.common.ProxyExceptionCode;
import org.apache.rocketmq.proxy.common.utils.FutureUtils;
-import org.apache.rocketmq.client.impl.mqclient.MQClientAPIFactory;
import org.apache.rocketmq.proxy.service.route.AddressableMessageQueue;
import org.apache.rocketmq.proxy.service.route.TopicRouteService;
import org.apache.rocketmq.remoting.protocol.RemotingCommand;
@@ -137,6 +138,19 @@ public class ClusterMessageService implements
MessageService {
);
}
+ @Override
+ public CompletableFuture<AckResult> batchAckMessage(ProxyContext ctx,
List<ReceiptHandleMessage> handleList, String consumerGroup,
+ String topic, long timeoutMillis) {
+ List<String> extraInfoList = handleList.stream().map(message ->
message.getReceiptHandle().getReceiptHandle()).collect(Collectors.toList());
+ return this.mqClientAPIFactory.getClient().batchAckMessageAsync(
+ this.resolveBrokerAddrInReceiptHandle(ctx,
handleList.get(0).getReceiptHandle()),
+ topic,
+ consumerGroup,
+ extraInfoList,
+ timeoutMillis
+ );
+ }
+
@Override
public CompletableFuture<PullResult> pullMessage(ProxyContext ctx,
AddressableMessageQueue messageQueue,
PullMessageRequestHeader requestHeader, long timeoutMillis) {
diff --git
a/proxy/src/main/java/org/apache/rocketmq/proxy/service/message/LocalMessageService.java
b/proxy/src/main/java/org/apache/rocketmq/proxy/service/message/LocalMessageService.java
index eb2c4d9ee9..ca7dcc9eb0 100644
---
a/proxy/src/main/java/org/apache/rocketmq/proxy/service/message/LocalMessageService.java
+++
b/proxy/src/main/java/org/apache/rocketmq/proxy/service/message/LocalMessageService.java
@@ -19,6 +19,7 @@ package org.apache.rocketmq.proxy.service.message;
import io.netty.channel.ChannelHandlerContext;
import java.nio.ByteBuffer;
import java.util.ArrayList;
+import java.util.BitSet;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
@@ -54,6 +55,8 @@ import org.apache.rocketmq.remoting.RPCHook;
import org.apache.rocketmq.remoting.protocol.RemotingCommand;
import org.apache.rocketmq.remoting.protocol.RequestCode;
import org.apache.rocketmq.remoting.protocol.ResponseCode;
+import org.apache.rocketmq.remoting.protocol.body.BatchAck;
+import org.apache.rocketmq.remoting.protocol.body.BatchAckMessageRequestBody;
import org.apache.rocketmq.remoting.protocol.body.LockBatchRequestBody;
import org.apache.rocketmq.remoting.protocol.body.UnlockBatchRequestBody;
import org.apache.rocketmq.remoting.protocol.header.AckMessageRequestHeader;
@@ -364,6 +367,61 @@ public class LocalMessageService implements MessageService
{
});
}
+ @Override
+ public CompletableFuture<AckResult> batchAckMessage(ProxyContext ctx,
List<ReceiptHandleMessage> handleList,
+ String consumerGroup, String topic, long timeoutMillis) {
+ SimpleChannel channel = channelManager.createChannel(ctx);
+ ChannelHandlerContext channelHandlerContext =
channel.getChannelHandlerContext();
+ RemotingCommand command =
LocalRemotingCommand.createRequestCommand(RequestCode.BATCH_ACK_MESSAGE, null);
+
+ Map<String, BatchAck> batchAckMap = new HashMap<>();
+ for (ReceiptHandleMessage receiptHandleMessage : handleList) {
+ String extraInfo =
receiptHandleMessage.getReceiptHandle().getReceiptHandle();
+ String[] extraInfoData = ExtraInfoUtil.split(extraInfo);
+ String mergeKey = ExtraInfoUtil.getRetry(extraInfoData) + "@" +
+ ExtraInfoUtil.getQueueId(extraInfoData) + "@" +
+ ExtraInfoUtil.getCkQueueOffset(extraInfoData) + "@" +
+ ExtraInfoUtil.getPopTime(extraInfoData);
+ BatchAck bAck = batchAckMap.computeIfAbsent(mergeKey, k -> {
+ BatchAck newBatchAck = new BatchAck();
+ newBatchAck.setConsumerGroup(consumerGroup);
+ newBatchAck.setTopic(topic);
+ newBatchAck.setRetry(ExtraInfoUtil.getRetry(extraInfoData));
+
newBatchAck.setStartOffset(ExtraInfoUtil.getCkQueueOffset(extraInfoData));
+
newBatchAck.setQueueId(ExtraInfoUtil.getQueueId(extraInfoData));
+
newBatchAck.setReviveQueueId(ExtraInfoUtil.getReviveQid(extraInfoData));
+
newBatchAck.setPopTime(ExtraInfoUtil.getPopTime(extraInfoData));
+
newBatchAck.setInvisibleTime(ExtraInfoUtil.getInvisibleTime(extraInfoData));
+ newBatchAck.setBitSet(new BitSet());
+ return newBatchAck;
+ });
+ bAck.getBitSet().set((int)
(ExtraInfoUtil.getQueueOffset(extraInfoData) -
ExtraInfoUtil.getCkQueueOffset(extraInfoData)));
+ }
+ BatchAckMessageRequestBody requestBody = new
BatchAckMessageRequestBody();
+
requestBody.setBrokerName(brokerController.getBrokerConfig().getBrokerName());
+ requestBody.setAcks(new ArrayList<>(batchAckMap.values()));
+
+ command.setBody(requestBody.encode());
+ CompletableFuture<RemotingCommand> future = new CompletableFuture<>();
+ try {
+ RemotingCommand response =
brokerController.getAckMessageProcessor()
+ .processRequest(channelHandlerContext, command);
+ future.complete(response);
+ } catch (Exception e) {
+ log.error("Fail to process batchAckMessage command", e);
+ future.completeExceptionally(e);
+ }
+ return future.thenApply(r -> {
+ AckResult ackResult = new AckResult();
+ if (ResponseCode.SUCCESS == r.getCode()) {
+ ackResult.setStatus(AckStatus.OK);
+ } else {
+ ackResult.setStatus(AckStatus.NO_EXIST);
+ }
+ return ackResult;
+ });
+ }
+
@Override
public CompletableFuture<PullResult> pullMessage(ProxyContext ctx,
AddressableMessageQueue messageQueue,
PullMessageRequestHeader requestHeader, long timeoutMillis) {
diff --git
a/proxy/src/main/java/org/apache/rocketmq/proxy/service/message/MessageService.java
b/proxy/src/main/java/org/apache/rocketmq/proxy/service/message/MessageService.java
index 15da171540..58a835adb4 100644
---
a/proxy/src/main/java/org/apache/rocketmq/proxy/service/message/MessageService.java
+++
b/proxy/src/main/java/org/apache/rocketmq/proxy/service/message/MessageService.java
@@ -91,6 +91,14 @@ public interface MessageService {
long timeoutMillis
);
+ CompletableFuture<AckResult> batchAckMessage(
+ ProxyContext ctx,
+ List<ReceiptHandleMessage> handleList,
+ String consumerGroup,
+ String topic,
+ long timeoutMillis
+ );
+
CompletableFuture<PullResult> pullMessage(
ProxyContext ctx,
AddressableMessageQueue messageQueue,
diff --git
a/proxy/src/main/java/org/apache/rocketmq/proxy/service/message/ReceiptHandleMessage.java
b/proxy/src/main/java/org/apache/rocketmq/proxy/service/message/ReceiptHandleMessage.java
new file mode 100644
index 0000000000..ae63fed491
--- /dev/null
+++
b/proxy/src/main/java/org/apache/rocketmq/proxy/service/message/ReceiptHandleMessage.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.proxy.service.message;
+
+import org.apache.rocketmq.common.consumer.ReceiptHandle;
+
+public class ReceiptHandleMessage {
+
+ private final ReceiptHandle receiptHandle;
+ private final String messageId;
+
+ public ReceiptHandleMessage(ReceiptHandle receiptHandle, String messageId)
{
+ this.receiptHandle = receiptHandle;
+ this.messageId = messageId;
+ }
+
+ public ReceiptHandle getReceiptHandle() {
+ return receiptHandle;
+ }
+
+ public String getMessageId() {
+ return messageId;
+ }
+}
diff --git
a/proxy/src/test/java/org/apache/rocketmq/proxy/grpc/v2/consumer/AckMessageActivityTest.java
b/proxy/src/test/java/org/apache/rocketmq/proxy/grpc/v2/consumer/AckMessageActivityTest.java
index 49fdfc6a8b..3c47461051 100644
---
a/proxy/src/test/java/org/apache/rocketmq/proxy/grpc/v2/consumer/AckMessageActivityTest.java
+++
b/proxy/src/test/java/org/apache/rocketmq/proxy/grpc/v2/consumer/AckMessageActivityTest.java
@@ -20,21 +20,32 @@ package org.apache.rocketmq.proxy.grpc.v2.consumer;
import apache.rocketmq.v2.AckMessageEntry;
import apache.rocketmq.v2.AckMessageRequest;
import apache.rocketmq.v2.AckMessageResponse;
+import apache.rocketmq.v2.AckMessageResultEntry;
import apache.rocketmq.v2.Code;
import apache.rocketmq.v2.Resource;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
import java.util.concurrent.CompletableFuture;
import org.apache.rocketmq.client.consumer.AckResult;
import org.apache.rocketmq.client.consumer.AckStatus;
import org.apache.rocketmq.proxy.common.ProxyException;
import org.apache.rocketmq.proxy.common.ProxyExceptionCode;
+import org.apache.rocketmq.proxy.config.ConfigurationManager;
import org.apache.rocketmq.proxy.grpc.v2.BaseActivityTest;
+import org.apache.rocketmq.proxy.processor.BatchAckResult;
+import org.apache.rocketmq.proxy.service.message.ReceiptHandleMessage;
import org.junit.Before;
import org.junit.Test;
+import org.mockito.stubbing.Answer;
import static org.junit.Assert.assertEquals;
import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyList;
import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.when;
public class AckMessageActivityTest extends BaseActivityTest {
@@ -52,43 +63,197 @@ public class AckMessageActivityTest extends
BaseActivityTest {
@Test
public void testAckMessage() throws Throwable {
- when(this.messagingProcessor.ackMessage(any(), any(), eq("msg1"),
anyString(), anyString()))
+ ConfigurationManager.getProxyConfig().setEnableBatchAck(false);
+
+ String msg1 = "msg1";
+ String msg2 = "msg2";
+ String msg3 = "msg3";
+
+ when(this.messagingProcessor.ackMessage(any(), any(), eq(msg1),
anyString(), anyString()))
.thenThrow(new
ProxyException(ProxyExceptionCode.INVALID_RECEIPT_HANDLE, "receipt handle is
expired"));
AckResult msg2AckResult = new AckResult();
msg2AckResult.setStatus(AckStatus.OK);
- when(this.messagingProcessor.ackMessage(any(), any(), eq("msg2"),
anyString(), anyString()))
+ when(this.messagingProcessor.ackMessage(any(), any(), eq(msg2),
anyString(), anyString()))
.thenReturn(CompletableFuture.completedFuture(msg2AckResult));
AckResult msg3AckResult = new AckResult();
msg3AckResult.setStatus(AckStatus.NO_EXIST);
- when(this.messagingProcessor.ackMessage(any(), any(), eq("msg3"),
anyString(), anyString()))
+ when(this.messagingProcessor.ackMessage(any(), any(), eq(msg3),
anyString(), anyString()))
.thenReturn(CompletableFuture.completedFuture(msg3AckResult));
- AckMessageResponse response = this.ackMessageActivity.ackMessage(
- createContext(),
- AckMessageRequest.newBuilder()
- .setTopic(Resource.newBuilder().setName(TOPIC).build())
- .setGroup(Resource.newBuilder().setName(GROUP).build())
- .addEntries(AckMessageEntry.newBuilder()
- .setMessageId("msg1")
- .setReceiptHandle(buildReceiptHandle(TOPIC,
System.currentTimeMillis() - 10000, 1000))
- .build())
- .addEntries(AckMessageEntry.newBuilder()
- .setMessageId("msg2")
- .setReceiptHandle(buildReceiptHandle(TOPIC,
System.currentTimeMillis(), 3000))
- .build())
- .addEntries(AckMessageEntry.newBuilder()
- .setMessageId("msg3")
- .setReceiptHandle(buildReceiptHandle(TOPIC,
System.currentTimeMillis(), 3000))
- .build())
- .build()
- ).get();
-
- assertEquals(Code.MULTIPLE_RESULTS, response.getStatus().getCode());
- assertEquals(3, response.getEntriesCount());
- assertEquals(Code.INVALID_RECEIPT_HANDLE,
response.getEntries(0).getStatus().getCode());
- assertEquals(Code.OK, response.getEntries(1).getStatus().getCode());
- assertEquals(Code.INTERNAL_SERVER_ERROR,
response.getEntries(2).getStatus().getCode());
+ {
+ AckMessageResponse response = this.ackMessageActivity.ackMessage(
+ createContext(),
+ AckMessageRequest.newBuilder()
+ .setTopic(Resource.newBuilder().setName(TOPIC).build())
+ .setGroup(Resource.newBuilder().setName(GROUP).build())
+ .addEntries(AckMessageEntry.newBuilder()
+ .setMessageId(msg1)
+ .setReceiptHandle(buildReceiptHandle(TOPIC,
System.currentTimeMillis() - 10000, 1000))
+ .build())
+ .build()
+ ).get();
+ assertEquals(Code.INVALID_RECEIPT_HANDLE,
response.getStatus().getCode());
+ }
+ {
+ AckMessageResponse response = this.ackMessageActivity.ackMessage(
+ createContext(),
+ AckMessageRequest.newBuilder()
+ .setTopic(Resource.newBuilder().setName(TOPIC).build())
+ .setGroup(Resource.newBuilder().setName(GROUP).build())
+ .addEntries(AckMessageEntry.newBuilder()
+ .setMessageId(msg2)
+ .setReceiptHandle(buildReceiptHandle(TOPIC,
System.currentTimeMillis() - 10000, 1000))
+ .build())
+ .build()
+ ).get();
+ assertEquals(Code.OK, response.getStatus().getCode());
+ }
+ {
+ AckMessageResponse response = this.ackMessageActivity.ackMessage(
+ createContext(),
+ AckMessageRequest.newBuilder()
+ .setTopic(Resource.newBuilder().setName(TOPIC).build())
+ .setGroup(Resource.newBuilder().setName(GROUP).build())
+ .addEntries(AckMessageEntry.newBuilder()
+ .setMessageId(msg3)
+ .setReceiptHandle(buildReceiptHandle(TOPIC,
System.currentTimeMillis() - 10000, 1000))
+ .build())
+ .build()
+ ).get();
+ assertEquals(Code.INTERNAL_SERVER_ERROR,
response.getStatus().getCode());
+ }
+ {
+ AckMessageResponse response = this.ackMessageActivity.ackMessage(
+ createContext(),
+ AckMessageRequest.newBuilder()
+ .setTopic(Resource.newBuilder().setName(TOPIC).build())
+ .setGroup(Resource.newBuilder().setName(GROUP).build())
+ .addEntries(AckMessageEntry.newBuilder()
+ .setMessageId(msg1)
+ .setReceiptHandle(buildReceiptHandle(TOPIC,
System.currentTimeMillis() - 10000, 1000))
+ .build())
+ .addEntries(AckMessageEntry.newBuilder()
+ .setMessageId(msg2)
+ .setReceiptHandle(buildReceiptHandle(TOPIC,
System.currentTimeMillis(), 3000))
+ .build())
+ .addEntries(AckMessageEntry.newBuilder()
+ .setMessageId(msg3)
+ .setReceiptHandle(buildReceiptHandle(TOPIC,
System.currentTimeMillis(), 3000))
+ .build())
+ .build()
+ ).get();
+
+ assertEquals(Code.MULTIPLE_RESULTS,
response.getStatus().getCode());
+ assertEquals(3, response.getEntriesCount());
+ assertEquals(Code.INVALID_RECEIPT_HANDLE,
response.getEntries(0).getStatus().getCode());
+ assertEquals(Code.OK,
response.getEntries(1).getStatus().getCode());
+ assertEquals(Code.INTERNAL_SERVER_ERROR,
response.getEntries(2).getStatus().getCode());
+ }
+ }
+
+ @Test
+ public void testAckMessageInBatch() throws Throwable {
+ ConfigurationManager.getProxyConfig().setEnableBatchAck(true);
+
+ String successMessageId = "msg1";
+ String notOkMessageId = "msg2";
+ String exceptionMessageId = "msg3";
+
+ doAnswer((Answer<CompletableFuture<List<BatchAckResult>>>) invocation
-> {
+ List<ReceiptHandleMessage> receiptHandleMessageList =
invocation.getArgument(1, List.class);
+ List<BatchAckResult> batchAckResultList = new ArrayList<>();
+ for (ReceiptHandleMessage receiptHandleMessage :
receiptHandleMessageList) {
+ BatchAckResult batchAckResult;
+ if
(receiptHandleMessage.getMessageId().equals(successMessageId)) {
+ AckResult ackResult = new AckResult();
+ ackResult.setStatus(AckStatus.OK);
+ batchAckResult = new BatchAckResult(receiptHandleMessage,
ackResult);
+ } else if
(receiptHandleMessage.getMessageId().equals(notOkMessageId)) {
+ AckResult ackResult = new AckResult();
+ ackResult.setStatus(AckStatus.NO_EXIST);
+ batchAckResult = new BatchAckResult(receiptHandleMessage,
ackResult);
+ } else {
+ batchAckResult = new BatchAckResult(receiptHandleMessage,
new ProxyException(ProxyExceptionCode.INVALID_RECEIPT_HANDLE, ""));
+ }
+ batchAckResultList.add(batchAckResult);
+ }
+ return CompletableFuture.completedFuture(batchAckResultList);
+ }).when(this.messagingProcessor).batchAckMessage(any(), anyList(),
anyString(), anyString());
+
+ {
+ AckMessageResponse response = this.ackMessageActivity.ackMessage(
+ createContext(),
+ AckMessageRequest.newBuilder()
+ .setTopic(Resource.newBuilder().setName(TOPIC).build())
+ .setGroup(Resource.newBuilder().setName(GROUP).build())
+ .addEntries(AckMessageEntry.newBuilder()
+ .setMessageId(successMessageId)
+ .setReceiptHandle(buildReceiptHandle(TOPIC,
System.currentTimeMillis(), 3000))
+ .build())
+ .build()
+ ).get();
+ assertEquals(Code.OK, response.getStatus().getCode());
+ }
+ {
+ AckMessageResponse response = this.ackMessageActivity.ackMessage(
+ createContext(),
+ AckMessageRequest.newBuilder()
+ .setTopic(Resource.newBuilder().setName(TOPIC).build())
+ .setGroup(Resource.newBuilder().setName(GROUP).build())
+ .addEntries(AckMessageEntry.newBuilder()
+ .setMessageId(notOkMessageId)
+ .setReceiptHandle(buildReceiptHandle(TOPIC,
System.currentTimeMillis(), 3000))
+ .build())
+ .build()
+ ).get();
+ assertEquals(Code.INTERNAL_SERVER_ERROR,
response.getStatus().getCode());
+ }
+ {
+ AckMessageResponse response = this.ackMessageActivity.ackMessage(
+ createContext(),
+ AckMessageRequest.newBuilder()
+ .setTopic(Resource.newBuilder().setName(TOPIC).build())
+ .setGroup(Resource.newBuilder().setName(GROUP).build())
+ .addEntries(AckMessageEntry.newBuilder()
+ .setMessageId(exceptionMessageId)
+ .setReceiptHandle(buildReceiptHandle(TOPIC,
System.currentTimeMillis(), 3000))
+ .build())
+ .build()
+ ).get();
+ assertEquals(Code.INVALID_RECEIPT_HANDLE,
response.getStatus().getCode());
+ }
+ {
+ AckMessageResponse response = this.ackMessageActivity.ackMessage(
+ createContext(),
+ AckMessageRequest.newBuilder()
+ .setTopic(Resource.newBuilder().setName(TOPIC).build())
+ .setGroup(Resource.newBuilder().setName(GROUP).build())
+ .addEntries(AckMessageEntry.newBuilder()
+ .setMessageId(successMessageId)
+ .setReceiptHandle(buildReceiptHandle(TOPIC,
System.currentTimeMillis(), 3000))
+ .build())
+ .addEntries(AckMessageEntry.newBuilder()
+ .setMessageId(notOkMessageId)
+ .setReceiptHandle(buildReceiptHandle(TOPIC,
System.currentTimeMillis(), 3000))
+ .build())
+ .addEntries(AckMessageEntry.newBuilder()
+ .setMessageId(exceptionMessageId)
+ .setReceiptHandle(buildReceiptHandle(TOPIC,
System.currentTimeMillis(), 3000))
+ .build())
+ .build()
+ ).get();
+
+ assertEquals(Code.MULTIPLE_RESULTS,
response.getStatus().getCode());
+ assertEquals(3, response.getEntriesCount());
+ Map<String, Code> msgCode = new HashMap<>();
+ for (AckMessageResultEntry entry : response.getEntriesList()) {
+ msgCode.put(entry.getMessageId(), entry.getStatus().getCode());
+ }
+ assertEquals(Code.OK, msgCode.get(successMessageId));
+ assertEquals(Code.INTERNAL_SERVER_ERROR,
msgCode.get(notOkMessageId));
+ assertEquals(Code.INVALID_RECEIPT_HANDLE,
msgCode.get(exceptionMessageId));
+ }
}
}
\ No newline at end of file
diff --git
a/proxy/src/test/java/org/apache/rocketmq/proxy/processor/BaseProcessorTest.java
b/proxy/src/test/java/org/apache/rocketmq/proxy/processor/BaseProcessorTest.java
index 5c1ea9627e..072630e394 100644
---
a/proxy/src/test/java/org/apache/rocketmq/proxy/processor/BaseProcessorTest.java
+++
b/proxy/src/test/java/org/apache/rocketmq/proxy/processor/BaseProcessorTest.java
@@ -66,14 +66,6 @@ public class BaseProcessorTest extends InitConfigTest {
protected ProxyRelayService proxyRelayService;
@Mock
protected MetadataService metadataService;
- @Mock
- protected ProducerProcessor producerProcessor;
- @Mock
- protected ConsumerProcessor consumerProcessor;
- @Mock
- protected TransactionProcessor transactionProcessor;
- @Mock
- protected ClientProcessor clientProcessor;
public void before() throws Throwable {
super.before();
@@ -92,6 +84,13 @@ public class BaseProcessorTest extends InitConfigTest {
}
protected static MessageExt createMessageExt(String topic, String tags,
int reconsumeTimes, long invisibleTime) {
+ return createMessageExt(topic, tags, reconsumeTimes, invisibleTime,
System.currentTimeMillis(),
+ RANDOM.nextInt(Integer.MAX_VALUE),
RANDOM.nextInt(Integer.MAX_VALUE), RANDOM.nextInt(Integer.MAX_VALUE),
+ RANDOM.nextInt(Integer.MAX_VALUE), "mockBroker");
+ }
+
+ protected static MessageExt createMessageExt(String topic, String tags,
int reconsumeTimes, long invisibleTime, long popTime,
+ long startOffset, int reviveQid, int queueId, long queueOffset, String
brokerName) {
MessageExt messageExt = new MessageExt();
messageExt.setTopic(topic);
messageExt.setTags(tags);
@@ -100,8 +99,7 @@ public class BaseProcessorTest extends InitConfigTest {
messageExt.setMsgId(MessageClientIDSetter.createUniqID());
messageExt.setCommitLogOffset(RANDOM.nextInt(Integer.MAX_VALUE));
MessageAccessor.putProperty(messageExt, MessageConst.PROPERTY_POP_CK,
- ExtraInfoUtil.buildExtraInfo(RANDOM.nextInt(Integer.MAX_VALUE),
System.currentTimeMillis(), invisibleTime,
- RANDOM.nextInt(Integer.MAX_VALUE), topic, "mockBroker",
RANDOM.nextInt(Integer.MAX_VALUE), RANDOM.nextInt(Integer.MAX_VALUE)));
+ ExtraInfoUtil.buildExtraInfo(startOffset, popTime, invisibleTime,
reviveQid, topic, brokerName, queueId, queueOffset));
return messageExt;
}
diff --git
a/proxy/src/test/java/org/apache/rocketmq/proxy/processor/ConsumerProcessorTest.java
b/proxy/src/test/java/org/apache/rocketmq/proxy/processor/ConsumerProcessorTest.java
index 717e86fc05..db268a06e6 100644
---
a/proxy/src/test/java/org/apache/rocketmq/proxy/processor/ConsumerProcessorTest.java
+++
b/proxy/src/test/java/org/apache/rocketmq/proxy/processor/ConsumerProcessorTest.java
@@ -20,8 +20,11 @@ package org.apache.rocketmq.proxy.processor;
import com.google.common.collect.Sets;
import java.time.Duration;
import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
+import java.util.Map;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executors;
@@ -39,7 +42,10 @@ import
org.apache.rocketmq.common.message.MessageClientIDSetter;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.common.message.MessageQueue;
import org.apache.rocketmq.proxy.common.ProxyContext;
+import org.apache.rocketmq.proxy.common.ProxyExceptionCode;
+import org.apache.rocketmq.proxy.common.utils.FutureUtils;
import org.apache.rocketmq.proxy.common.utils.ProxyUtils;
+import org.apache.rocketmq.proxy.service.message.ReceiptHandleMessage;
import org.apache.rocketmq.proxy.service.route.AddressableMessageQueue;
import org.apache.rocketmq.proxy.service.route.MessageQueueView;
import org.apache.rocketmq.remoting.protocol.RemotingCommand;
@@ -50,16 +56,22 @@ import
org.apache.rocketmq.remoting.protocol.header.PopMessageRequestHeader;
import org.junit.Before;
import org.junit.Test;
import org.mockito.ArgumentCaptor;
+import org.mockito.stubbing.Answer;
import static org.assertj.core.api.Assertions.assertThat;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertSame;
import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyList;
import static org.mockito.ArgumentMatchers.anyLong;
import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
public class ConsumerProcessorTest extends BaseProcessorTest {
@@ -162,6 +174,109 @@ public class ConsumerProcessorTest extends
BaseProcessorTest {
assertEquals(handle.getReceiptHandle(),
requestHeaderArgumentCaptor.getValue().getExtraInfo());
}
+ @Test
+ public void testBatchAckExpireMessage() throws Throwable {
+ String brokerName1 = "brokerName1";
+
+ List<ReceiptHandleMessage> receiptHandleMessageList = new
ArrayList<>();
+ for (int i = 0; i < 3; i++) {
+ MessageExt expireMessage = createMessageExt(TOPIC, "", 0, 3000,
System.currentTimeMillis() - 10000,
+ 0, 0, 0, i, brokerName1);
+ ReceiptHandle expireHandle = create(expireMessage);
+ receiptHandleMessageList.add(new
ReceiptHandleMessage(expireHandle, expireMessage.getMsgId()));
+ }
+
+ List<BatchAckResult> batchAckResultList =
this.consumerProcessor.batchAckMessage(createContext(),
receiptHandleMessageList, CONSUMER_GROUP, TOPIC, 3000).get();
+
+ verify(this.messageService, never()).batchAckMessage(any(), anyList(),
anyString(), anyString(), anyLong());
+ assertEquals(receiptHandleMessageList.size(),
batchAckResultList.size());
+ for (BatchAckResult batchAckResult : batchAckResultList) {
+ assertNull(batchAckResult.getAckResult());
+ assertNotNull(batchAckResult.getProxyException());
+ assertNotNull(batchAckResult.getReceiptHandleMessage());
+ }
+
+ }
+
+ @Test
+ public void testBatchAckMessage() throws Throwable {
+ String brokerName1 = "brokerName1";
+ String brokerName2 = "brokerName2";
+ String errThrowBrokerName = "errThrowBrokerName";
+ MessageExt expireMessage = createMessageExt(TOPIC, "", 0, 3000,
System.currentTimeMillis() - 10000,
+ 0, 0, 0, 0, brokerName1);
+ ReceiptHandle expireHandle = create(expireMessage);
+
+ List<ReceiptHandleMessage> receiptHandleMessageList = new
ArrayList<>();
+ receiptHandleMessageList.add(new ReceiptHandleMessage(expireHandle,
expireMessage.getMsgId()));
+ List<String> broker1Msg = new ArrayList<>();
+ List<String> broker2Msg = new ArrayList<>();
+
+ long now = System.currentTimeMillis();
+ int msgNum = 3;
+ for (int i = 0; i < msgNum; i++) {
+ MessageExt brokerMessage = createMessageExt(TOPIC, "", 0, 3000,
now,
+ 0, 0, 0, i + 1, brokerName1);
+ ReceiptHandle brokerHandle = create(brokerMessage);
+ receiptHandleMessageList.add(new
ReceiptHandleMessage(brokerHandle, brokerMessage.getMsgId()));
+ broker1Msg.add(brokerMessage.getMsgId());
+ }
+ for (int i = 0; i < msgNum; i++) {
+ MessageExt brokerMessage = createMessageExt(TOPIC, "", 0, 3000,
now,
+ 0, 0, 0, i + 1, brokerName2);
+ ReceiptHandle brokerHandle = create(brokerMessage);
+ receiptHandleMessageList.add(new
ReceiptHandleMessage(brokerHandle, brokerMessage.getMsgId()));
+ broker2Msg.add(brokerMessage.getMsgId());
+ }
+
+ // for this message, will throw exception in batchAckMessage
+ MessageExt errThrowMessage = createMessageExt(TOPIC, "", 0, 3000, now,
+ 0, 0, 0, 0, errThrowBrokerName);
+ ReceiptHandle errThrowHandle = create(errThrowMessage);
+ receiptHandleMessageList.add(new ReceiptHandleMessage(errThrowHandle,
errThrowMessage.getMsgId()));
+
+ Collections.shuffle(receiptHandleMessageList);
+
+ doAnswer((Answer<CompletableFuture<AckResult>>) invocation -> {
+ List<ReceiptHandleMessage> handleMessageList =
invocation.getArgument(1, List.class);
+ AckResult ackResult = new AckResult();
+ String brokerName =
handleMessageList.get(0).getReceiptHandle().getBrokerName();
+ if (brokerName.equals(brokerName1)) {
+ ackResult.setStatus(AckStatus.OK);
+ } else if (brokerName.equals(brokerName2)) {
+ ackResult.setStatus(AckStatus.NO_EXIST);
+ } else {
+ return FutureUtils.completeExceptionally(new
RuntimeException());
+ }
+
+ return CompletableFuture.completedFuture(ackResult);
+ }).when(this.messageService).batchAckMessage(any(), anyList(),
anyString(), anyString(), anyLong());
+
+ List<BatchAckResult> batchAckResultList =
this.consumerProcessor.batchAckMessage(createContext(),
receiptHandleMessageList, CONSUMER_GROUP, TOPIC, 3000).get();
+ assertEquals(receiptHandleMessageList.size(),
batchAckResultList.size());
+
+ // check ackResult for each msg
+ Map<String, BatchAckResult> msgBatchAckResult = new HashMap<>();
+ for (BatchAckResult batchAckResult : batchAckResultList) {
+
msgBatchAckResult.put(batchAckResult.getReceiptHandleMessage().getMessageId(),
batchAckResult);
+ }
+ for (String msgId : broker1Msg) {
+ assertEquals(AckStatus.OK,
msgBatchAckResult.get(msgId).getAckResult().getStatus());
+ assertNull(msgBatchAckResult.get(msgId).getProxyException());
+ }
+ for (String msgId : broker2Msg) {
+ assertEquals(AckStatus.NO_EXIST,
msgBatchAckResult.get(msgId).getAckResult().getStatus());
+ assertNull(msgBatchAckResult.get(msgId).getProxyException());
+ }
+
assertNotNull(msgBatchAckResult.get(expireMessage.getMsgId()).getProxyException());
+ assertEquals(ProxyExceptionCode.INVALID_RECEIPT_HANDLE,
msgBatchAckResult.get(expireMessage.getMsgId()).getProxyException().getCode());
+
assertNull(msgBatchAckResult.get(expireMessage.getMsgId()).getAckResult());
+
+
assertNotNull(msgBatchAckResult.get(errThrowMessage.getMsgId()).getProxyException());
+ assertEquals(ProxyExceptionCode.INTERNAL_SERVER_ERROR,
msgBatchAckResult.get(errThrowMessage.getMsgId()).getProxyException().getCode());
+
assertNull(msgBatchAckResult.get(errThrowMessage.getMsgId()).getAckResult());
+ }
+
@Test
public void testChangeInvisibleTime() throws Throwable {
ReceiptHandle handle =
create(createMessageExt(MixAll.RETRY_GROUP_TOPIC_PREFIX + TOPIC, "", 0, 3000));
diff --git
a/proxy/src/test/java/org/apache/rocketmq/proxy/service/mqclient/MQClientAPIExtTest.java
b/proxy/src/test/java/org/apache/rocketmq/proxy/service/mqclient/MQClientAPIExtTest.java
index 77a119a296..3f3a4ae40c 100644
---
a/proxy/src/test/java/org/apache/rocketmq/proxy/service/mqclient/MQClientAPIExtTest.java
+++
b/proxy/src/test/java/org/apache/rocketmq/proxy/service/mqclient/MQClientAPIExtTest.java
@@ -220,6 +220,18 @@ public class MQClientAPIExtTest {
assertSame(ackResult, mqClientAPI.ackMessageAsync(BROKER_ADDR, new
AckMessageRequestHeader(), TIMEOUT).get());
}
+ @Test
+ public void testBatchAckMessageAsync() throws Exception {
+ AckResult ackResult = new AckResult();
+ doAnswer((Answer<Void>) mock -> {
+ AckCallback ackCallback = mock.getArgument(2);
+ ackCallback.onSuccess(ackResult);
+ return null;
+ }).when(mqClientAPI).batchAckMessageAsync(anyString(), anyLong(),
any(AckCallback.class), any());
+
+ assertSame(ackResult, mqClientAPI.batchAckMessageAsync(BROKER_ADDR,
TOPIC, CONSUMER_GROUP, new ArrayList<>(), TIMEOUT).get());
+ }
+
@Test
public void testChangeInvisibleTimeAsync() throws Exception {
AckResult ackResult = new AckResult();