This is an automated email from the ASF dual-hosted git repository.
lollipop pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git
The following commit(s) were added to refs/heads/develop by this push:
new 702bbd2b83 [ISSUE #7803] Add try catch for lock and unlock (#7804)
702bbd2b83 is described below
commit 702bbd2b831d4fd97a5ad00f9587e79a0a3e5fda
Author: Zhouxiang Zhan <[email protected]>
AuthorDate: Sun Feb 4 16:14:40 2024 +0800
[ISSUE #7803] Add try catch for lock and unlock (#7804)
* Add try catch for lock and unlock
---
.../proxy/processor/ConsumerProcessor.java | 90 +++++++++++++---------
1 file changed, 53 insertions(+), 37 deletions(-)
diff --git
a/proxy/src/main/java/org/apache/rocketmq/proxy/processor/ConsumerProcessor.java
b/proxy/src/main/java/org/apache/rocketmq/proxy/processor/ConsumerProcessor.java
index 7870233576..3ff3423701 100644
---
a/proxy/src/main/java/org/apache/rocketmq/proxy/processor/ConsumerProcessor.java
+++
b/proxy/src/main/java/org/apache/rocketmq/proxy/processor/ConsumerProcessor.java
@@ -414,49 +414,59 @@ public class ConsumerProcessor extends AbstractProcessor {
public CompletableFuture<Set<MessageQueue>> lockBatchMQ(ProxyContext ctx,
Set<MessageQueue> mqSet,
String consumerGroup, String clientId, long timeoutMillis) {
CompletableFuture<Set<MessageQueue>> future = new
CompletableFuture<>();
- Set<MessageQueue> successSet = new CopyOnWriteArraySet<>();
- Set<AddressableMessageQueue> addressableMessageQueueSet =
buildAddressableSet(ctx, mqSet);
- Map<String, List<AddressableMessageQueue>> messageQueueSetMap =
buildAddressableMapByBrokerName(addressableMessageQueueSet);
- List<CompletableFuture<Void>> futureList = new ArrayList<>();
- messageQueueSetMap.forEach((k, v) -> {
- LockBatchRequestBody requestBody = new LockBatchRequestBody();
- requestBody.setConsumerGroup(consumerGroup);
- requestBody.setClientId(clientId);
-
requestBody.setMqSet(v.stream().map(AddressableMessageQueue::getMessageQueue).collect(Collectors.toSet()));
- CompletableFuture<Void> future0 =
serviceManager.getMessageService()
- .lockBatchMQ(ctx, v.get(0), requestBody, timeoutMillis)
- .thenAccept(successSet::addAll);
- futureList.add(FutureUtils.addExecutor(future0, this.executor));
- });
- CompletableFuture.allOf(futureList.toArray(new
CompletableFuture[0])).whenComplete((v, t) -> {
- if (t != null) {
- log.error("LockBatchMQ failed", t);
- }
- future.complete(successSet);
- });
+ try {
+ Set<MessageQueue> successSet = new CopyOnWriteArraySet<>();
+ Set<AddressableMessageQueue> addressableMessageQueueSet =
buildAddressableSet(ctx, mqSet);
+ Map<String, List<AddressableMessageQueue>> messageQueueSetMap =
buildAddressableMapByBrokerName(addressableMessageQueueSet);
+ List<CompletableFuture<Void>> futureList = new ArrayList<>();
+ messageQueueSetMap.forEach((k, v) -> {
+ LockBatchRequestBody requestBody = new LockBatchRequestBody();
+ requestBody.setConsumerGroup(consumerGroup);
+ requestBody.setClientId(clientId);
+
requestBody.setMqSet(v.stream().map(AddressableMessageQueue::getMessageQueue).collect(Collectors.toSet()));
+ CompletableFuture<Void> future0 =
serviceManager.getMessageService()
+ .lockBatchMQ(ctx, v.get(0), requestBody, timeoutMillis)
+ .thenAccept(successSet::addAll);
+ futureList.add(FutureUtils.addExecutor(future0,
this.executor));
+ });
+ CompletableFuture.allOf(futureList.toArray(new
CompletableFuture[0])).whenComplete((v, t) -> {
+ if (t != null) {
+ log.error("LockBatchMQ failed, group={}", consumerGroup,
t);
+ }
+ future.complete(successSet);
+ });
+ } catch (Throwable t) {
+ log.error("LockBatchMQ exception, group={}", consumerGroup, t);
+ future.completeExceptionally(t);
+ }
return FutureUtils.addExecutor(future, this.executor);
}
public CompletableFuture<Void> unlockBatchMQ(ProxyContext ctx,
Set<MessageQueue> mqSet,
String consumerGroup, String clientId, long timeoutMillis) {
CompletableFuture<Void> future = new CompletableFuture<>();
- Set<AddressableMessageQueue> addressableMessageQueueSet =
buildAddressableSet(ctx, mqSet);
- Map<String, List<AddressableMessageQueue>> messageQueueSetMap =
buildAddressableMapByBrokerName(addressableMessageQueueSet);
- List<CompletableFuture<Void>> futureList = new ArrayList<>();
- messageQueueSetMap.forEach((k, v) -> {
- UnlockBatchRequestBody requestBody = new UnlockBatchRequestBody();
- requestBody.setConsumerGroup(consumerGroup);
- requestBody.setClientId(clientId);
-
requestBody.setMqSet(v.stream().map(AddressableMessageQueue::getMessageQueue).collect(Collectors.toSet()));
- CompletableFuture<Void> future0 =
serviceManager.getMessageService().unlockBatchMQ(ctx, v.get(0), requestBody,
timeoutMillis);
- futureList.add(FutureUtils.addExecutor(future0, this.executor));
- });
- CompletableFuture.allOf(futureList.toArray(new
CompletableFuture[0])).whenComplete((v, t) -> {
- if (t != null) {
- log.error("UnlockBatchMQ failed", t);
- }
- future.complete(null);
- });
+ try {
+ Set<AddressableMessageQueue> addressableMessageQueueSet =
buildAddressableSet(ctx, mqSet);
+ Map<String, List<AddressableMessageQueue>> messageQueueSetMap =
buildAddressableMapByBrokerName(addressableMessageQueueSet);
+ List<CompletableFuture<Void>> futureList = new ArrayList<>();
+ messageQueueSetMap.forEach((k, v) -> {
+ UnlockBatchRequestBody requestBody = new
UnlockBatchRequestBody();
+ requestBody.setConsumerGroup(consumerGroup);
+ requestBody.setClientId(clientId);
+
requestBody.setMqSet(v.stream().map(AddressableMessageQueue::getMessageQueue).collect(Collectors.toSet()));
+ CompletableFuture<Void> future0 =
serviceManager.getMessageService().unlockBatchMQ(ctx, v.get(0), requestBody,
timeoutMillis);
+ futureList.add(FutureUtils.addExecutor(future0,
this.executor));
+ });
+ CompletableFuture.allOf(futureList.toArray(new
CompletableFuture[0])).whenComplete((v, t) -> {
+ if (t != null) {
+ log.error("UnlockBatchMQ failed, group={}", consumerGroup,
t);
+ }
+ future.complete(null);
+ });
+ } catch (Throwable t) {
+ log.error("UnlockBatchMQ exception, group={}", consumerGroup, t);
+ future.completeExceptionally(t);
+ }
return FutureUtils.addExecutor(future, this.executor);
}
@@ -505,7 +515,13 @@ public class ConsumerProcessor extends AbstractProcessor {
protected HashMap<String, List<AddressableMessageQueue>>
buildAddressableMapByBrokerName(
final Set<AddressableMessageQueue> mqSet) {
HashMap<String, List<AddressableMessageQueue>> result = new
HashMap<>();
+ if (mqSet == null) {
+ return result;
+ }
for (AddressableMessageQueue mq : mqSet) {
+ if (mq == null) {
+ continue;
+ }
List<AddressableMessageQueue> mqs =
result.computeIfAbsent(mq.getBrokerName(), k -> new ArrayList<>());
mqs.add(mq);
}