This is an automated email from the ASF dual-hosted git repository.
zhouxzhan pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git
The following commit(s) were added to refs/heads/develop by this push:
new 6c3781f17e [ISSUE #8365] add non-oneway updateConsumerOffset (#8368)
6c3781f17e is described below
commit 6c3781f17e22ec35ddb2113bab5cdc4967cb8260
Author: 吴星灿 <[email protected]>
AuthorDate: Fri Jul 12 13:47:58 2024 +0800
[ISSUE #8365] add non-oneway updateConsumerOffset (#8368)
---
.../client/impl/mqclient/MQClientAPIExt.java | 27 ++++++++++++++++
.../client/impl/mqclient/MQClientAPIExtTest.java | 37 ++++++++++++++++++++++
2 files changed, 64 insertions(+)
diff --git
a/client/src/main/java/org/apache/rocketmq/client/impl/mqclient/MQClientAPIExt.java
b/client/src/main/java/org/apache/rocketmq/client/impl/mqclient/MQClientAPIExt.java
index b97e00c577..0e2092b8a0 100644
---
a/client/src/main/java/org/apache/rocketmq/client/impl/mqclient/MQClientAPIExt.java
+++
b/client/src/main/java/org/apache/rocketmq/client/impl/mqclient/MQClientAPIExt.java
@@ -400,6 +400,33 @@ public class MQClientAPIExt extends MQClientAPIImpl {
return future;
}
+ public CompletableFuture<Void> updateConsumerOffsetAsync(
+ String brokerAddr,
+ UpdateConsumerOffsetRequestHeader header,
+ long timeoutMillis
+ ) {
+ RemotingCommand request =
RemotingCommand.createRequestCommand(RequestCode.UPDATE_CONSUMER_OFFSET,
header);
+ CompletableFuture<Void> future = new CompletableFuture<>();
+ invoke(brokerAddr, request, timeoutMillis).whenComplete((response, t)
-> {
+ if (t != null) {
+ log.error("updateConsumerOffsetAsync failed, brokerAddr={},
requestHeader={}", brokerAddr, header, t);
+ future.completeExceptionally(t);
+ return;
+ }
+ switch (response.getCode()) {
+ case ResponseCode.SUCCESS: {
+ future.complete(null);
+ }
+ case ResponseCode.SYSTEM_ERROR:
+ case ResponseCode.SUBSCRIPTION_GROUP_NOT_EXIST:
+ case ResponseCode.TOPIC_NOT_EXIST: {
+ future.completeExceptionally(new
MQBrokerException(response.getCode(), response.getRemark()));
+ }
+ }
+ });
+ return future;
+ }
+
public CompletableFuture<List<String>> getConsumerListByGroupAsync(
String brokerAddr,
GetConsumerListByGroupRequestHeader requestHeader,
diff --git
a/client/src/test/java/org/apache/rocketmq/client/impl/mqclient/MQClientAPIExtTest.java
b/client/src/test/java/org/apache/rocketmq/client/impl/mqclient/MQClientAPIExtTest.java
index 752bc98eab..6f692dff95 100644
---
a/client/src/test/java/org/apache/rocketmq/client/impl/mqclient/MQClientAPIExtTest.java
+++
b/client/src/test/java/org/apache/rocketmq/client/impl/mqclient/MQClientAPIExtTest.java
@@ -18,14 +18,19 @@
package org.apache.rocketmq.client.impl.mqclient;
import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
import org.apache.rocketmq.client.ClientConfig;
+import org.apache.rocketmq.client.exception.MQBrokerException;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.utils.FutureUtils;
import org.apache.rocketmq.remoting.exception.RemotingTimeoutException;
import org.apache.rocketmq.remoting.netty.NettyClientConfig;
import org.apache.rocketmq.remoting.netty.NettyRemotingClient;
+import org.apache.rocketmq.remoting.protocol.RemotingCommand;
+import org.apache.rocketmq.remoting.protocol.ResponseCode;
import org.apache.rocketmq.remoting.protocol.header.SendMessageRequestHeader;
+import
org.apache.rocketmq.remoting.protocol.header.UpdateConsumerOffsetRequestHeader;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
@@ -34,9 +39,12 @@ import org.mockito.Mockito;
import org.mockito.junit.MockitoJUnitRunner;
import static org.assertj.core.api.Assertions.assertThatThrownBy;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyLong;
import static org.mockito.ArgumentMatchers.anyString;
+import static org.mockito.Mockito.doReturn;
@RunWith(MockitoJUnitRunner.class)
public class MQClientAPIExtTest {
@@ -71,4 +79,33 @@ public class MQClientAPIExtTest {
CompletableFuture<SendResult> future =
mqClientAPIExt.sendMessageAsync("127.0.0.1:10911", "test", msg, requestHeader,
10);
assertThatThrownBy(future::get).getCause().isInstanceOf(RemotingTimeoutException.class);
}
+
+ @Test
+ public void testUpdateConsumerOffsetAsync_Success() throws
ExecutionException, InterruptedException {
+ CompletableFuture<RemotingCommand> remotingFuture = new
CompletableFuture<>();
+
remotingFuture.complete(RemotingCommand.createResponseCommand(ResponseCode.SUCCESS,
""));
+ doReturn(remotingFuture).when(remotingClientMock).invoke(anyString(),
any(RemotingCommand.class), anyLong());
+
+ CompletableFuture<Void> future =
mqClientAPIExt.updateConsumerOffsetAsync("brokerAddr", new
UpdateConsumerOffsetRequestHeader(), 3000L);
+
+ assertNull("Future should be completed without exception",
future.get());
+ }
+
+ @Test
+ public void testUpdateConsumerOffsetAsync_Fail() throws
InterruptedException {
+
+ CompletableFuture<RemotingCommand> remotingFuture = new
CompletableFuture<>();
+
remotingFuture.complete(RemotingCommand.createResponseCommand(ResponseCode.SYSTEM_ERROR,
"QueueId is null, topic is testTopic"));
+ doReturn(remotingFuture).when(remotingClientMock).invoke(anyString(),
any(RemotingCommand.class), anyLong());
+
+ CompletableFuture<Void> future =
mqClientAPIExt.updateConsumerOffsetAsync("brokerAddr", new
UpdateConsumerOffsetRequestHeader(), 3000L);
+
+ try {
+ future.get();
+ } catch (ExecutionException e) {
+ MQBrokerException customEx = (MQBrokerException) e.getCause();
+ assertEquals(customEx.getResponseCode(),
ResponseCode.SYSTEM_ERROR);
+ assertEquals(customEx.getErrorMessage(), "QueueId is null, topic
is testTopic");
+ }
+ }
}
\ No newline at end of file