This is an automated email from the ASF dual-hosted git repository.
zhouxzhan pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git
The following commit(s) were added to refs/heads/develop by this push:
new 1588d65762 [ISSUE #8365] add remoting client non-oneway
updateConsumerOffset function (#8391)
1588d65762 is described below
commit 1588d6576295963787a99ab8d8754adf458bf329
Author: 吴星灿 <[email protected]>
AuthorDate: Wed Jul 17 15:41:45 2024 +0800
[ISSUE #8365] add remoting client non-oneway updateConsumerOffset function
(#8391)
* add non-oneway updateConsumerOffset
---
.../proxy/processor/ConsumerProcessor.java | 29 ++++++++++++++++++----
.../proxy/processor/DefaultMessagingProcessor.java | 11 ++++++--
.../proxy/processor/MessagingProcessor.java | 16 +++++++++---
.../service/message/ClusterMessageService.java | 15 +++++++++--
.../proxy/service/message/LocalMessageService.java | 6 +++++
.../proxy/service/message/MessageService.java | 7 ++++++
6 files changed, 72 insertions(+), 12 deletions(-)
diff --git
a/proxy/src/main/java/org/apache/rocketmq/proxy/processor/ConsumerProcessor.java
b/proxy/src/main/java/org/apache/rocketmq/proxy/processor/ConsumerProcessor.java
index 24fc0a2a28..ace8af1b99 100644
---
a/proxy/src/main/java/org/apache/rocketmq/proxy/processor/ConsumerProcessor.java
+++
b/proxy/src/main/java/org/apache/rocketmq/proxy/processor/ConsumerProcessor.java
@@ -40,12 +40,12 @@ import
org.apache.rocketmq.common.message.MessageClientIDSetter;
import org.apache.rocketmq.common.message.MessageConst;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.common.message.MessageQueue;
+import org.apache.rocketmq.common.utils.FutureUtils;
import org.apache.rocketmq.logging.org.slf4j.Logger;
import org.apache.rocketmq.logging.org.slf4j.LoggerFactory;
import org.apache.rocketmq.proxy.common.ProxyContext;
import org.apache.rocketmq.proxy.common.ProxyException;
import org.apache.rocketmq.proxy.common.ProxyExceptionCode;
-import org.apache.rocketmq.common.utils.FutureUtils;
import org.apache.rocketmq.proxy.common.utils.ProxyUtils;
import org.apache.rocketmq.proxy.service.ServiceManager;
import org.apache.rocketmq.proxy.service.message.ReceiptHandleMessage;
@@ -96,7 +96,7 @@ public class ConsumerProcessor extends AbstractProcessor {
}
return popMessage(ctx, messageQueue, consumerGroup, topic,
maxMsgNums, invisibleTime, pollTime, initMode,
subscriptionData, fifo, popMessageResultFilter, attemptId,
timeoutMillis);
- } catch (Throwable t) {
+ } catch (Throwable t) {
future.completeExceptionally(t);
}
return future;
@@ -287,7 +287,8 @@ public class ConsumerProcessor extends AbstractProcessor {
return FutureUtils.addExecutor(future, this.executor);
}
- protected CompletableFuture<List<BatchAckResult>>
processBrokerHandle(ProxyContext ctx, String consumerGroup, String topic,
List<ReceiptHandleMessage> handleMessageList, long timeoutMillis) {
+ protected CompletableFuture<List<BatchAckResult>>
processBrokerHandle(ProxyContext ctx, String consumerGroup,
+ String topic, List<ReceiptHandleMessage> handleMessageList, long
timeoutMillis) {
return this.serviceManager.getMessageService().batchAckMessage(ctx,
handleMessageList, consumerGroup, topic, timeoutMillis)
.thenApply(result -> {
List<BatchAckResult> results = new ArrayList<>();
@@ -393,6 +394,24 @@ public class ConsumerProcessor extends AbstractProcessor {
return FutureUtils.addExecutor(future, this.executor);
}
+ public CompletableFuture<Void> updateConsumerOffsetAsync(ProxyContext ctx,
MessageQueue messageQueue,
+ String consumerGroup, long commitOffset, long timeoutMillis) {
+ CompletableFuture<Void> future = new CompletableFuture<>();
+ try {
+ AddressableMessageQueue addressableMessageQueue =
serviceManager.getTopicRouteService()
+ .buildAddressableMessageQueue(ctx, messageQueue);
+ UpdateConsumerOffsetRequestHeader requestHeader = new
UpdateConsumerOffsetRequestHeader();
+ requestHeader.setConsumerGroup(consumerGroup);
+ requestHeader.setTopic(addressableMessageQueue.getTopic());
+ requestHeader.setQueueId(addressableMessageQueue.getQueueId());
+ requestHeader.setCommitOffset(commitOffset);
+ future =
serviceManager.getMessageService().updateConsumerOffsetAsync(ctx,
addressableMessageQueue, requestHeader, timeoutMillis);
+ } catch (Throwable t) {
+ future.completeExceptionally(t);
+ }
+ return FutureUtils.addExecutor(future, this.executor);
+ }
+
public CompletableFuture<Long> queryConsumerOffset(ProxyContext ctx,
MessageQueue messageQueue,
String consumerGroup, long timeoutMillis) {
CompletableFuture<Long> future = new CompletableFuture<>();
@@ -501,9 +520,9 @@ public class ConsumerProcessor extends AbstractProcessor {
protected Set<AddressableMessageQueue> buildAddressableSet(ProxyContext
ctx, Set<MessageQueue> mqSet) {
Set<AddressableMessageQueue> addressableMessageQueueSet = new
HashSet<>(mqSet.size());
- for (MessageQueue mq:mqSet) {
+ for (MessageQueue mq : mqSet) {
try {
-
addressableMessageQueueSet.add(serviceManager.getTopicRouteService().buildAddressableMessageQueue(ctx,
mq)) ;
+
addressableMessageQueueSet.add(serviceManager.getTopicRouteService().buildAddressableMessageQueue(ctx,
mq));
} catch (Exception e) {
log.error("build addressable message queue fail, messageQueue
= {}", mq, e);
}
diff --git
a/proxy/src/main/java/org/apache/rocketmq/proxy/processor/DefaultMessagingProcessor.java
b/proxy/src/main/java/org/apache/rocketmq/proxy/processor/DefaultMessagingProcessor.java
index 48a732c284..9c494d7a45 100644
---
a/proxy/src/main/java/org/apache/rocketmq/proxy/processor/DefaultMessagingProcessor.java
+++
b/proxy/src/main/java/org/apache/rocketmq/proxy/processor/DefaultMessagingProcessor.java
@@ -75,7 +75,7 @@ public class DefaultMessagingProcessor extends
AbstractStartAndShutdown implemen
protected ThreadPoolExecutor producerProcessorExecutor;
protected ThreadPoolExecutor consumerProcessorExecutor;
protected static final String ROCKETMQ_HOME =
System.getProperty(MixAll.ROCKETMQ_HOME_PROPERTY,
- System.getenv(MixAll.ROCKETMQ_HOME_ENV));
+ System.getenv(MixAll.ROCKETMQ_HOME_ENV));
protected DefaultMessagingProcessor(ServiceManager serviceManager) {
ProxyConfig proxyConfig = ConfigurationManager.getProxyConfig();
@@ -167,7 +167,8 @@ public class DefaultMessagingProcessor extends
AbstractStartAndShutdown implemen
}
@Override
- public CompletableFuture<Void> endTransaction(ProxyContext ctx, String
topic, String transactionId, String messageId, String producerGroup,
+ public CompletableFuture<Void> endTransaction(ProxyContext ctx, String
topic, String transactionId,
+ String messageId, String producerGroup,
TransactionStatus transactionStatus, boolean fromTransactionCheck,
long timeoutMillis) {
return this.transactionProcessor.endTransaction(ctx, topic,
transactionId, messageId, producerGroup, transactionStatus,
fromTransactionCheck, timeoutMillis);
@@ -225,6 +226,12 @@ public class DefaultMessagingProcessor extends
AbstractStartAndShutdown implemen
return this.consumerProcessor.updateConsumerOffset(ctx, messageQueue,
consumerGroup, commitOffset, timeoutMillis);
}
+ @Override
+ public CompletableFuture<Void> updateConsumerOffsetAsync(ProxyContext ctx,
MessageQueue messageQueue,
+ String consumerGroup, long commitOffset, long timeoutMillis) {
+ return this.consumerProcessor.updateConsumerOffsetAsync(ctx,
messageQueue, consumerGroup, commitOffset, timeoutMillis);
+ }
+
@Override
public CompletableFuture<Long> queryConsumerOffset(ProxyContext ctx,
MessageQueue messageQueue,
String consumerGroup, long timeoutMillis) {
diff --git
a/proxy/src/main/java/org/apache/rocketmq/proxy/processor/MessagingProcessor.java
b/proxy/src/main/java/org/apache/rocketmq/proxy/processor/MessagingProcessor.java
index 213d2beeea..03d28262d7 100644
---
a/proxy/src/main/java/org/apache/rocketmq/proxy/processor/MessagingProcessor.java
+++
b/proxy/src/main/java/org/apache/rocketmq/proxy/processor/MessagingProcessor.java
@@ -33,10 +33,10 @@ import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
import org.apache.rocketmq.common.consumer.ReceiptHandle;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageQueue;
+import org.apache.rocketmq.common.utils.StartAndShutdown;
import org.apache.rocketmq.proxy.common.Address;
import org.apache.rocketmq.proxy.common.MessageReceiptHandle;
import org.apache.rocketmq.proxy.common.ProxyContext;
-import org.apache.rocketmq.common.utils.StartAndShutdown;
import org.apache.rocketmq.proxy.service.message.ReceiptHandleMessage;
import org.apache.rocketmq.proxy.service.metadata.MetadataService;
import org.apache.rocketmq.proxy.service.relay.ProxyRelayService;
@@ -217,6 +217,14 @@ public interface MessagingProcessor extends
StartAndShutdown {
long timeoutMillis
);
+ CompletableFuture<Void> updateConsumerOffsetAsync(
+ ProxyContext ctx,
+ MessageQueue messageQueue,
+ String consumerGroup,
+ long commitOffset,
+ long timeoutMillis
+ );
+
CompletableFuture<Long> queryConsumerOffset(
ProxyContext ctx,
MessageQueue messageQueue,
@@ -321,7 +329,9 @@ public interface MessagingProcessor extends
StartAndShutdown {
MetadataService getMetadataService();
- void addReceiptHandle(ProxyContext ctx, Channel channel, String group,
String msgID, MessageReceiptHandle messageReceiptHandle);
+ void addReceiptHandle(ProxyContext ctx, Channel channel, String group,
String msgID,
+ MessageReceiptHandle messageReceiptHandle);
- MessageReceiptHandle removeReceiptHandle(ProxyContext ctx, Channel
channel, String group, String msgID, String receiptHandle);
+ MessageReceiptHandle removeReceiptHandle(ProxyContext ctx, Channel
channel, String group, String msgID,
+ String receiptHandle);
}
diff --git
a/proxy/src/main/java/org/apache/rocketmq/proxy/service/message/ClusterMessageService.java
b/proxy/src/main/java/org/apache/rocketmq/proxy/service/message/ClusterMessageService.java
index ba7d5ad8e2..f9eb94fcfc 100644
---
a/proxy/src/main/java/org/apache/rocketmq/proxy/service/message/ClusterMessageService.java
+++
b/proxy/src/main/java/org/apache/rocketmq/proxy/service/message/ClusterMessageService.java
@@ -29,10 +29,10 @@ import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.consumer.ReceiptHandle;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageQueue;
+import org.apache.rocketmq.common.utils.FutureUtils;
import org.apache.rocketmq.proxy.common.ProxyContext;
import org.apache.rocketmq.proxy.common.ProxyException;
import org.apache.rocketmq.proxy.common.ProxyExceptionCode;
-import org.apache.rocketmq.common.utils.FutureUtils;
import org.apache.rocketmq.proxy.service.route.AddressableMessageQueue;
import org.apache.rocketmq.proxy.service.route.TopicRouteService;
import org.apache.rocketmq.remoting.protocol.RemotingCommand;
@@ -139,7 +139,8 @@ public class ClusterMessageService implements
MessageService {
}
@Override
- public CompletableFuture<AckResult> batchAckMessage(ProxyContext ctx,
List<ReceiptHandleMessage> handleList, String consumerGroup,
+ public CompletableFuture<AckResult> batchAckMessage(ProxyContext ctx,
List<ReceiptHandleMessage> handleList,
+ String consumerGroup,
String topic, long timeoutMillis) {
List<String> extraInfoList = handleList.stream().map(message ->
message.getReceiptHandle().getReceiptHandle()).collect(Collectors.toList());
return this.mqClientAPIFactory.getClient().batchAckMessageAsync(
@@ -181,6 +182,16 @@ public class ClusterMessageService implements
MessageService {
);
}
+ @Override
+ public CompletableFuture<Void> updateConsumerOffsetAsync(ProxyContext ctx,
AddressableMessageQueue messageQueue,
+ UpdateConsumerOffsetRequestHeader requestHeader, long timeoutMillis) {
+ return this.mqClientAPIFactory.getClient().updateConsumerOffsetAsync(
+ messageQueue.getBrokerAddr(),
+ requestHeader,
+ timeoutMillis
+ );
+ }
+
@Override
public CompletableFuture<Set<MessageQueue>> lockBatchMQ(ProxyContext ctx,
AddressableMessageQueue messageQueue,
LockBatchRequestBody requestBody, long timeoutMillis) {
diff --git
a/proxy/src/main/java/org/apache/rocketmq/proxy/service/message/LocalMessageService.java
b/proxy/src/main/java/org/apache/rocketmq/proxy/service/message/LocalMessageService.java
index aaa688fee6..6b2ba02f7c 100644
---
a/proxy/src/main/java/org/apache/rocketmq/proxy/service/message/LocalMessageService.java
+++
b/proxy/src/main/java/org/apache/rocketmq/proxy/service/message/LocalMessageService.java
@@ -440,6 +440,12 @@ public class LocalMessageService implements MessageService
{
throw new NotImplementedException("updateConsumerOffset is not
implemented in LocalMessageService");
}
+ @Override
+ public CompletableFuture<Void> updateConsumerOffsetAsync(ProxyContext ctx,
AddressableMessageQueue messageQueue,
+ UpdateConsumerOffsetRequestHeader requestHeader, long timeoutMillis) {
+ throw new NotImplementedException("updateConsumerOffsetAsync is not
implemented in LocalMessageService");
+ }
+
@Override
public CompletableFuture<Set<MessageQueue>> lockBatchMQ(ProxyContext ctx,
AddressableMessageQueue messageQueue,
LockBatchRequestBody requestBody, long timeoutMillis) {
diff --git
a/proxy/src/main/java/org/apache/rocketmq/proxy/service/message/MessageService.java
b/proxy/src/main/java/org/apache/rocketmq/proxy/service/message/MessageService.java
index 58a835adb4..61accbc041 100644
---
a/proxy/src/main/java/org/apache/rocketmq/proxy/service/message/MessageService.java
+++
b/proxy/src/main/java/org/apache/rocketmq/proxy/service/message/MessageService.java
@@ -120,6 +120,13 @@ public interface MessageService {
long timeoutMillis
);
+ CompletableFuture<Void> updateConsumerOffsetAsync(
+ ProxyContext ctx,
+ AddressableMessageQueue messageQueue,
+ UpdateConsumerOffsetRequestHeader requestHeader,
+ long timeoutMillis
+ );
+
CompletableFuture<Set<MessageQueue>> lockBatchMQ(
ProxyContext ctx,
AddressableMessageQueue messageQueue,