This is an automated email from the ASF dual-hosted git repository.
zhouxzhan pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git
The following commit(s) were added to refs/heads/develop by this push:
new 23a8ed490 [ISSUE #6644] Add admin client future interface (#6646)
23a8ed490 is described below
commit 23a8ed490fd8e6e960914b11af127dfce323fd22
Author: Zhouxiang Zhan <[email protected]>
AuthorDate: Fri Apr 28 17:34:16 2023 +0800
[ISSUE #6644] Add admin client future interface (#6646)
* [ISSUE #6644] Add admin client future interface
* add unit test
* Change exception to throwable
* Add interface for MqClientAdmin
---
.../org/apache/rocketmq/client/MqClientAdmin.java | 110 ++++++
.../client/impl/admin/MqClientAdminImpl.java | 438 +++++++++++++++++++++
.../proxy/service/mqclient/MQClientAPIExt.java | 23 +-
.../apache/rocketmq/remoting/RemotingClient.java | 26 ++
.../remoting/netty/NettyRemotingClientTest.java | 90 ++++-
5 files changed, 672 insertions(+), 15 deletions(-)
diff --git a/client/src/main/java/org/apache/rocketmq/client/MqClientAdmin.java
b/client/src/main/java/org/apache/rocketmq/client/MqClientAdmin.java
new file mode 100644
index 000000000..4eb74c0ca
--- /dev/null
+++ b/client/src/main/java/org/apache/rocketmq/client/MqClientAdmin.java
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.client;
+
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import org.apache.rocketmq.common.message.MessageExt;
+import org.apache.rocketmq.common.message.MessageQueue;
+import org.apache.rocketmq.remoting.protocol.admin.ConsumeStats;
+import org.apache.rocketmq.remoting.protocol.admin.TopicStatsTable;
+import org.apache.rocketmq.remoting.protocol.body.ClusterInfo;
+import org.apache.rocketmq.remoting.protocol.body.ConsumeMessageDirectlyResult;
+import org.apache.rocketmq.remoting.protocol.body.ConsumerConnection;
+import org.apache.rocketmq.remoting.protocol.body.ConsumerRunningInfo;
+import org.apache.rocketmq.remoting.protocol.body.GroupList;
+import org.apache.rocketmq.remoting.protocol.body.QueueTimeSpan;
+import org.apache.rocketmq.remoting.protocol.body.TopicList;
+import
org.apache.rocketmq.remoting.protocol.header.ConsumeMessageDirectlyResultRequestHeader;
+import org.apache.rocketmq.remoting.protocol.header.CreateTopicRequestHeader;
+import
org.apache.rocketmq.remoting.protocol.header.DeleteSubscriptionGroupRequestHeader;
+import org.apache.rocketmq.remoting.protocol.header.DeleteTopicRequestHeader;
+import
org.apache.rocketmq.remoting.protocol.header.GetConsumeStatsRequestHeader;
+import
org.apache.rocketmq.remoting.protocol.header.GetConsumerConnectionListRequestHeader;
+import
org.apache.rocketmq.remoting.protocol.header.GetConsumerRunningInfoRequestHeader;
+import
org.apache.rocketmq.remoting.protocol.header.GetTopicStatsInfoRequestHeader;
+import
org.apache.rocketmq.remoting.protocol.header.QueryConsumeTimeSpanRequestHeader;
+import org.apache.rocketmq.remoting.protocol.header.QueryMessageRequestHeader;
+import
org.apache.rocketmq.remoting.protocol.header.QuerySubscriptionByConsumerRequestHeader;
+import
org.apache.rocketmq.remoting.protocol.header.QueryTopicConsumeByWhoRequestHeader;
+import
org.apache.rocketmq.remoting.protocol.header.QueryTopicsByConsumerRequestHeader;
+import org.apache.rocketmq.remoting.protocol.header.ResetOffsetRequestHeader;
+import org.apache.rocketmq.remoting.protocol.header.ViewMessageRequestHeader;
+import
org.apache.rocketmq.remoting.protocol.header.namesrv.DeleteKVConfigRequestHeader;
+import
org.apache.rocketmq.remoting.protocol.header.namesrv.DeleteTopicFromNamesrvRequestHeader;
+import org.apache.rocketmq.remoting.protocol.heartbeat.SubscriptionData;
+import
org.apache.rocketmq.remoting.protocol.subscription.SubscriptionGroupConfig;
+
+public interface MqClientAdmin {
+ CompletableFuture<List<MessageExt>> queryMessage(String address, boolean
uniqueKeyFlag, boolean decompressBody,
+ QueryMessageRequestHeader requestHeader, long timeoutMillis);
+
+ CompletableFuture<TopicStatsTable> getTopicStatsInfo(String address,
+ GetTopicStatsInfoRequestHeader requestHeader, long timeoutMillis);
+
+ CompletableFuture<List<QueueTimeSpan>> queryConsumeTimeSpan(String address,
+ QueryConsumeTimeSpanRequestHeader requestHeader, long timeoutMillis);
+
+ CompletableFuture<Void> updateOrCreateTopic(String address,
CreateTopicRequestHeader requestHeader,
+ long timeoutMillis);
+
+ CompletableFuture<Void> updateOrCreateSubscriptionGroup(String address,
SubscriptionGroupConfig config,
+ long timeoutMillis);
+
+ CompletableFuture<Void> deleteTopicInBroker(String address,
DeleteTopicRequestHeader requestHeader,
+ long timeoutMillis);
+
+ CompletableFuture<Void> deleteTopicInNameserver(String address,
DeleteTopicFromNamesrvRequestHeader requestHeader,
+ long timeoutMillis);
+
+ CompletableFuture<Void> deleteKvConfig(String address,
DeleteKVConfigRequestHeader requestHeader,
+ long timeoutMillis);
+
+ CompletableFuture<Void> deleteSubscriptionGroup(String address,
DeleteSubscriptionGroupRequestHeader requestHeader,
+ long timeoutMillis);
+
+ CompletableFuture<Map<MessageQueue, Long>>
invokeBrokerToResetOffset(String address,
+ ResetOffsetRequestHeader requestHeader, long timeoutMillis);
+
+ CompletableFuture<MessageExt> viewMessage(String address,
ViewMessageRequestHeader requestHeader,
+ long timeoutMillis);
+
+ CompletableFuture<ClusterInfo> getBrokerClusterInfo(String address, long
timeoutMillis);
+
+ CompletableFuture<ConsumerConnection> getConsumerConnectionList(String
address,
+ GetConsumerConnectionListRequestHeader requestHeader, long
timeoutMillis);
+
+ CompletableFuture<TopicList> queryTopicsByConsumer(String address,
+ QueryTopicsByConsumerRequestHeader requestHeader, long timeoutMillis);
+
+ CompletableFuture<SubscriptionData> querySubscriptionByConsumer(String
address,
+ QuerySubscriptionByConsumerRequestHeader requestHeader, long
timeoutMillis);
+
+ CompletableFuture<ConsumeStats> getConsumeStats(String address,
GetConsumeStatsRequestHeader requestHeader,
+ long timeoutMillis);
+
+ CompletableFuture<GroupList> queryTopicConsumeByWho(String address,
+ QueryTopicConsumeByWhoRequestHeader requestHeader, long timeoutMillis);
+
+ CompletableFuture<ConsumerRunningInfo> getConsumerRunningInfo(String
address,
+ GetConsumerRunningInfoRequestHeader requestHeader, long timeoutMillis);
+
+ CompletableFuture<ConsumeMessageDirectlyResult>
consumeMessageDirectly(String address,
+ ConsumeMessageDirectlyResultRequestHeader requestHeader, long
timeoutMillis);
+}
diff --git
a/client/src/main/java/org/apache/rocketmq/client/impl/admin/MqClientAdminImpl.java
b/client/src/main/java/org/apache/rocketmq/client/impl/admin/MqClientAdminImpl.java
new file mode 100644
index 000000000..34f066c7d
--- /dev/null
+++
b/client/src/main/java/org/apache/rocketmq/client/impl/admin/MqClientAdminImpl.java
@@ -0,0 +1,438 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.client.impl.admin;
+
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.stream.Collectors;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.rocketmq.client.MqClientAdmin;
+import org.apache.rocketmq.client.exception.MQClientException;
+import org.apache.rocketmq.common.MixAll;
+import org.apache.rocketmq.common.message.MessageConst;
+import org.apache.rocketmq.common.message.MessageDecoder;
+import org.apache.rocketmq.common.message.MessageExt;
+import org.apache.rocketmq.common.message.MessageQueue;
+import org.apache.rocketmq.logging.org.slf4j.Logger;
+import org.apache.rocketmq.logging.org.slf4j.LoggerFactory;
+import org.apache.rocketmq.remoting.RemotingClient;
+import org.apache.rocketmq.remoting.protocol.RemotingCommand;
+import org.apache.rocketmq.remoting.protocol.RemotingSerializable;
+import org.apache.rocketmq.remoting.protocol.RequestCode;
+import org.apache.rocketmq.remoting.protocol.ResponseCode;
+import org.apache.rocketmq.remoting.protocol.admin.ConsumeStats;
+import org.apache.rocketmq.remoting.protocol.admin.TopicStatsTable;
+import org.apache.rocketmq.remoting.protocol.body.ClusterInfo;
+import org.apache.rocketmq.remoting.protocol.body.ConsumeMessageDirectlyResult;
+import org.apache.rocketmq.remoting.protocol.body.ConsumerConnection;
+import org.apache.rocketmq.remoting.protocol.body.ConsumerRunningInfo;
+import org.apache.rocketmq.remoting.protocol.body.GroupList;
+import org.apache.rocketmq.remoting.protocol.body.QueryConsumeTimeSpanBody;
+import
org.apache.rocketmq.remoting.protocol.body.QuerySubscriptionResponseBody;
+import org.apache.rocketmq.remoting.protocol.body.QueueTimeSpan;
+import org.apache.rocketmq.remoting.protocol.body.ResetOffsetBody;
+import org.apache.rocketmq.remoting.protocol.body.TopicList;
+import
org.apache.rocketmq.remoting.protocol.header.ConsumeMessageDirectlyResultRequestHeader;
+import org.apache.rocketmq.remoting.protocol.header.CreateTopicRequestHeader;
+import
org.apache.rocketmq.remoting.protocol.header.DeleteSubscriptionGroupRequestHeader;
+import org.apache.rocketmq.remoting.protocol.header.DeleteTopicRequestHeader;
+import
org.apache.rocketmq.remoting.protocol.header.GetConsumeStatsRequestHeader;
+import
org.apache.rocketmq.remoting.protocol.header.GetConsumerConnectionListRequestHeader;
+import
org.apache.rocketmq.remoting.protocol.header.GetConsumerRunningInfoRequestHeader;
+import
org.apache.rocketmq.remoting.protocol.header.GetTopicStatsInfoRequestHeader;
+import
org.apache.rocketmq.remoting.protocol.header.QueryConsumeTimeSpanRequestHeader;
+import org.apache.rocketmq.remoting.protocol.header.QueryMessageRequestHeader;
+import
org.apache.rocketmq.remoting.protocol.header.QuerySubscriptionByConsumerRequestHeader;
+import
org.apache.rocketmq.remoting.protocol.header.QueryTopicConsumeByWhoRequestHeader;
+import
org.apache.rocketmq.remoting.protocol.header.QueryTopicsByConsumerRequestHeader;
+import org.apache.rocketmq.remoting.protocol.header.ResetOffsetRequestHeader;
+import org.apache.rocketmq.remoting.protocol.header.ViewMessageRequestHeader;
+import
org.apache.rocketmq.remoting.protocol.header.namesrv.DeleteKVConfigRequestHeader;
+import
org.apache.rocketmq.remoting.protocol.header.namesrv.DeleteTopicFromNamesrvRequestHeader;
+import org.apache.rocketmq.remoting.protocol.heartbeat.SubscriptionData;
+import
org.apache.rocketmq.remoting.protocol.subscription.SubscriptionGroupConfig;
+
+public class MqClientAdminImpl implements MqClientAdmin {
+ private final static Logger log =
LoggerFactory.getLogger(MqClientAdminImpl.class);
+ private final RemotingClient remotingClient;
+
+ public MqClientAdminImpl(RemotingClient remotingClient) {
+ this.remotingClient = remotingClient;
+ }
+
+ @Override
+ public CompletableFuture<List<MessageExt>> queryMessage(String address,
boolean uniqueKeyFlag, boolean decompressBody,
+ QueryMessageRequestHeader requestHeader, long timeoutMillis) {
+ CompletableFuture<List<MessageExt>> future = new CompletableFuture<>();
+ RemotingCommand request =
RemotingCommand.createRequestCommand(RequestCode.QUERY_MESSAGE, requestHeader);
+ request.addExtField(MixAll.UNIQUE_MSG_QUERY_FLAG,
String.valueOf(uniqueKeyFlag));
+ remotingClient.invoke(address, request,
timeoutMillis).thenAccept(response -> {
+ if (response.getCode() == ResponseCode.SUCCESS) {
+ List<MessageExt> wrappers =
MessageDecoder.decodesBatch(ByteBuffer.wrap(response.getBody()), true,
decompressBody, true);
+ future.complete(filterMessages(wrappers,
requestHeader.getTopic(), requestHeader.getKey(), uniqueKeyFlag));
+ } else if (response.getCode() == ResponseCode.QUERY_NOT_FOUND) {
+ List<MessageExt> wrappers = new ArrayList<>();
+ future.complete(wrappers);
+ } else {
+ log.warn("queryMessage getResponseCommand failed, {} {},
header={}", response.getCode(), response.getRemark(), requestHeader);
+ future.completeExceptionally(new
MQClientException(response.getCode(), response.getRemark()));
+ }
+ });
+
+ return future;
+ }
+
+ @Override
+ public CompletableFuture<TopicStatsTable> getTopicStatsInfo(String address,
+ GetTopicStatsInfoRequestHeader requestHeader, long timeoutMillis) {
+ CompletableFuture<TopicStatsTable> future = new CompletableFuture<>();
+ RemotingCommand request =
RemotingCommand.createRequestCommand(RequestCode.GET_TOPIC_STATS_INFO,
requestHeader);
+ remotingClient.invoke(address, request,
timeoutMillis).thenAccept(response -> {
+ if (response.getCode() == ResponseCode.SUCCESS) {
+ TopicStatsTable topicStatsTable =
TopicStatsTable.decode(response.getBody(), TopicStatsTable.class);
+ future.complete(topicStatsTable);
+ } else {
+ log.warn("getTopicStatsInfo getResponseCommand failed, {} {},
header={}", response.getCode(), response.getRemark(), requestHeader);
+ future.completeExceptionally(new
MQClientException(response.getCode(), response.getRemark()));
+ }
+ });
+ return future;
+ }
+
+ @Override
+ public CompletableFuture<List<QueueTimeSpan>> queryConsumeTimeSpan(String
address,
+ QueryConsumeTimeSpanRequestHeader requestHeader, long timeoutMillis) {
+ CompletableFuture<List<QueueTimeSpan>> future = new
CompletableFuture<>();
+ RemotingCommand request =
RemotingCommand.createRequestCommand(RequestCode.QUERY_CONSUME_TIME_SPAN,
requestHeader);
+ remotingClient.invoke(address, request,
timeoutMillis).thenAccept(response -> {
+ if (response.getCode() == ResponseCode.SUCCESS) {
+ QueryConsumeTimeSpanBody consumeTimeSpanBody =
GroupList.decode(response.getBody(), QueryConsumeTimeSpanBody.class);
+ future.complete(consumeTimeSpanBody.getConsumeTimeSpanSet());
+ } else {
+ log.warn("queryConsumerTimeSpan getResponseCommand failed, {}
{}, header={}", response.getCode(), response.getRemark(), requestHeader);
+ future.completeExceptionally(new
MQClientException(response.getCode(), response.getRemark()));
+ }
+ });
+ return future;
+ }
+
+ @Override
+ public CompletableFuture<Void> updateOrCreateTopic(String address,
CreateTopicRequestHeader requestHeader,
+ long timeoutMillis) {
+ CompletableFuture<Void> future = new CompletableFuture<>();
+ RemotingCommand request =
RemotingCommand.createRequestCommand(RequestCode.UPDATE_AND_CREATE_TOPIC,
requestHeader);
+ remotingClient.invoke(address, request,
timeoutMillis).thenAccept(response -> {
+ if (response.getCode() == ResponseCode.SUCCESS) {
+ future.complete(null);
+ } else {
+ log.warn("updateOrCreateTopic getResponseCommand failed, {}
{}, header={}", response.getCode(), response.getRemark(), requestHeader);
+ future.completeExceptionally(new
MQClientException(response.getCode(), response.getRemark()));
+ }
+ });
+ return future;
+ }
+
+ @Override
+ public CompletableFuture<Void> updateOrCreateSubscriptionGroup(String
address, SubscriptionGroupConfig config,
+ long timeoutMillis) {
+ CompletableFuture<Void> future = new CompletableFuture<>();
+ RemotingCommand request =
RemotingCommand.createRequestCommand(RequestCode.UPDATE_AND_CREATE_SUBSCRIPTIONGROUP,
null);
+ byte[] body = RemotingSerializable.encode(config);
+ request.setBody(body);
+ remotingClient.invoke(address, request,
timeoutMillis).thenAccept(response -> {
+ if (response.getCode() == ResponseCode.SUCCESS) {
+ future.complete(null);
+ } else {
+ log.warn("updateOrCreateSubscriptionGroup getResponseCommand
failed, {} {}, header={}", response.getCode(), response.getRemark(), config);
+ future.completeExceptionally(new
MQClientException(response.getCode(), response.getRemark()));
+ }
+ });
+ return future;
+ }
+
+ @Override
+ public CompletableFuture<Void> deleteTopicInBroker(String address,
DeleteTopicRequestHeader requestHeader,
+ long timeoutMillis) {
+ CompletableFuture<Void> future = new CompletableFuture<>();
+ RemotingCommand request =
RemotingCommand.createRequestCommand(RequestCode.DELETE_TOPIC_IN_BROKER,
requestHeader);
+ remotingClient.invoke(address, request,
timeoutMillis).thenAccept(response -> {
+ if (response.getCode() == ResponseCode.SUCCESS) {
+ future.complete(null);
+ } else {
+ log.warn("deleteTopicInBroker getResponseCommand failed, {}
{}, header={}", response.getCode(), response.getRemark(), requestHeader);
+ future.completeExceptionally(new
MQClientException(response.getCode(), response.getRemark()));
+ }
+ });
+ return future;
+ }
+
+ @Override
+ public CompletableFuture<Void> deleteTopicInNameserver(String address,
DeleteTopicFromNamesrvRequestHeader requestHeader,
+ long timeoutMillis) {
+ CompletableFuture<Void> future = new CompletableFuture<>();
+ RemotingCommand request =
RemotingCommand.createRequestCommand(RequestCode.DELETE_TOPIC_IN_NAMESRV,
requestHeader);
+ remotingClient.invoke(address, request,
timeoutMillis).thenAccept(response -> {
+ if (response.getCode() == ResponseCode.SUCCESS) {
+ future.complete(null);
+ } else {
+ log.warn("deleteTopicInNameserver getResponseCommand failed,
{} {}, header={}", response.getCode(), response.getRemark(), requestHeader);
+ future.completeExceptionally(new
MQClientException(response.getCode(), response.getRemark()));
+ }
+ });
+ return future;
+ }
+
+ @Override
+ public CompletableFuture<Void> deleteKvConfig(String address,
DeleteKVConfigRequestHeader requestHeader,
+ long timeoutMillis) {
+ CompletableFuture<Void> future = new CompletableFuture<>();
+ RemotingCommand request =
RemotingCommand.createRequestCommand(RequestCode.DELETE_KV_CONFIG,
requestHeader);
+ remotingClient.invoke(address, request,
timeoutMillis).thenAccept(response -> {
+ if (response.getCode() == ResponseCode.SUCCESS) {
+ future.complete(null);
+ } else {
+ log.warn("deleteKvConfig getResponseCommand failed, {} {},
header={}", response.getCode(), response.getRemark(), requestHeader);
+ future.completeExceptionally(new
MQClientException(response.getCode(), response.getRemark()));
+ }
+ });
+ return future;
+ }
+
+ @Override
+ public CompletableFuture<Void> deleteSubscriptionGroup(String address,
DeleteSubscriptionGroupRequestHeader requestHeader,
+ long timeoutMillis) {
+ CompletableFuture<Void> future = new CompletableFuture<>();
+ RemotingCommand request =
RemotingCommand.createRequestCommand(RequestCode.DELETE_SUBSCRIPTIONGROUP,
requestHeader);
+ remotingClient.invoke(address, request,
timeoutMillis).thenAccept(response -> {
+ if (response.getCode() == ResponseCode.SUCCESS) {
+ future.complete(null);
+ } else {
+ log.warn("deleteSubscriptionGroup getResponseCommand failed,
{} {}, header={}", response.getCode(), response.getRemark(), requestHeader);
+ future.completeExceptionally(new
MQClientException(response.getCode(), response.getRemark()));
+ }
+ });
+ return future;
+ }
+
+ @Override
+ public CompletableFuture<Map<MessageQueue, Long>>
invokeBrokerToResetOffset(String address,
+ ResetOffsetRequestHeader requestHeader, long timeoutMillis) {
+ CompletableFuture<Map<MessageQueue, Long>> future = new
CompletableFuture<>();
+ RemotingCommand request =
RemotingCommand.createRequestCommand(RequestCode.INVOKE_BROKER_TO_RESET_OFFSET,
requestHeader);
+ remotingClient.invoke(address, request,
timeoutMillis).thenAccept(response -> {
+ if (response.getCode() == ResponseCode.SUCCESS && null !=
response.getBody()) {
+ Map<MessageQueue, Long> offsetTable =
ResetOffsetBody.decode(response.getBody(),
ResetOffsetBody.class).getOffsetTable();
+ future.complete(offsetTable);
+ log.info("Invoke broker to reset offset success. address:{},
header:{}, offsetTable:{}",
+ address, requestHeader, offsetTable);
+ } else {
+ log.warn("invokeBrokerToResetOffset getResponseCommand failed,
{} {}, header={}", response.getCode(), response.getRemark(), requestHeader);
+ future.completeExceptionally(new
MQClientException(response.getCode(), response.getRemark()));
+ }
+ });
+ return future;
+ }
+
+ @Override
+ public CompletableFuture<MessageExt> viewMessage(String address,
ViewMessageRequestHeader requestHeader,
+ long timeoutMillis) {
+ CompletableFuture<MessageExt> future = new CompletableFuture<>();
+ RemotingCommand request =
RemotingCommand.createRequestCommand(RequestCode.VIEW_MESSAGE_BY_ID,
requestHeader);
+ remotingClient.invoke(address, request,
timeoutMillis).thenAccept(response -> {
+ if (response.getCode() == ResponseCode.SUCCESS) {
+ ByteBuffer byteBuffer = ByteBuffer.wrap(response.getBody());
+ MessageExt messageExt =
MessageDecoder.clientDecode(byteBuffer, true);
+ future.complete(messageExt);
+ } else {
+ log.warn("viewMessage getResponseCommand failed, {} {},
header={}", response.getCode(), response.getRemark(), requestHeader);
+ future.completeExceptionally(new
MQClientException(response.getCode(), response.getRemark()));
+ }
+ });
+ return future;
+ }
+
+ @Override
+ public CompletableFuture<ClusterInfo> getBrokerClusterInfo(String address,
long timeoutMillis) {
+ CompletableFuture<ClusterInfo> future = new CompletableFuture<>();
+ RemotingCommand request =
RemotingCommand.createRequestCommand(RequestCode.GET_BROKER_CLUSTER_INFO, null);
+ remotingClient.invoke(address, request,
timeoutMillis).thenAccept(response -> {
+ if (response.getCode() == ResponseCode.SUCCESS) {
+ ClusterInfo clusterInfo =
ClusterInfo.decode(response.getBody(), ClusterInfo.class);
+ future.complete(clusterInfo);
+ } else {
+ log.warn("getBrokerClusterInfo getResponseCommand failed, {}
{}", response.getCode(), response.getRemark());
+ future.completeExceptionally(new
MQClientException(response.getCode(), response.getRemark()));
+ }
+ });
+ return future;
+ }
+
+ @Override
+ public CompletableFuture<ConsumerConnection>
getConsumerConnectionList(String address,
+ GetConsumerConnectionListRequestHeader requestHeader, long
timeoutMillis) {
+ CompletableFuture<ConsumerConnection> future = new
CompletableFuture<>();
+ RemotingCommand request =
RemotingCommand.createRequestCommand(RequestCode.GET_CONSUMER_CONNECTION_LIST,
requestHeader);
+ remotingClient.invoke(address, request,
timeoutMillis).thenAccept(response -> {
+ if (response.getCode() == ResponseCode.SUCCESS) {
+ ConsumerConnection consumerConnection =
ConsumerConnection.decode(response.getBody(), ConsumerConnection.class);
+ future.complete(consumerConnection);
+ } else {
+ log.warn("getConsumerConnectionList getResponseCommand failed,
{} {}", response.getCode(), response.getRemark());
+ future.completeExceptionally(new
MQClientException(response.getCode(), response.getRemark()));
+ }
+ });
+ return future;
+ }
+
+ @Override
+ public CompletableFuture<TopicList> queryTopicsByConsumer(String address,
+ QueryTopicsByConsumerRequestHeader requestHeader, long timeoutMillis) {
+ CompletableFuture<TopicList> future = new CompletableFuture<>();
+ RemotingCommand request =
RemotingCommand.createRequestCommand(RequestCode.QUERY_TOPICS_BY_CONSUMER,
requestHeader);
+ remotingClient.invoke(address, request,
timeoutMillis).thenAccept(response -> {
+ if (response.getCode() == ResponseCode.SUCCESS) {
+ TopicList topicList = TopicList.decode(response.getBody(),
TopicList.class);
+ future.complete(topicList);
+ } else {
+ log.warn("queryTopicsByConsumer getResponseCommand failed, {}
{}", response.getCode(), response.getRemark());
+ future.completeExceptionally(new
MQClientException(response.getCode(), response.getRemark()));
+ }
+ });
+ return future;
+ }
+
+ @Override
+ public CompletableFuture<SubscriptionData>
querySubscriptionByConsumer(String address,
+ QuerySubscriptionByConsumerRequestHeader requestHeader, long
timeoutMillis) {
+ CompletableFuture<SubscriptionData> future = new CompletableFuture<>();
+ RemotingCommand request =
RemotingCommand.createRequestCommand(RequestCode.QUERY_SUBSCRIPTION_BY_CONSUMER,
requestHeader);
+ remotingClient.invoke(address, request,
timeoutMillis).thenAccept(response -> {
+ if (response.getCode() == ResponseCode.SUCCESS) {
+ QuerySubscriptionResponseBody subscriptionResponseBody =
+ QuerySubscriptionResponseBody.decode(response.getBody(),
QuerySubscriptionResponseBody.class);
+
future.complete(subscriptionResponseBody.getSubscriptionData());
+ } else {
+ log.warn("querySubscriptionByConsumer getResponseCommand
failed, {} {}", response.getCode(), response.getRemark());
+ future.completeExceptionally(new
MQClientException(response.getCode(), response.getRemark()));
+ }
+ });
+ return future;
+ }
+
+ @Override
+ public CompletableFuture<ConsumeStats> getConsumeStats(String address,
GetConsumeStatsRequestHeader requestHeader,
+ long timeoutMillis) {
+ CompletableFuture<ConsumeStats> future = new CompletableFuture<>();
+ RemotingCommand request =
RemotingCommand.createRequestCommand(RequestCode.GET_CONSUME_STATS,
requestHeader);
+ remotingClient.invoke(address, request,
timeoutMillis).thenAccept(response -> {
+ if (response.getCode() == ResponseCode.SUCCESS) {
+ ConsumeStats consumeStats =
ConsumeStats.decode(response.getBody(), ConsumeStats.class);
+ future.complete(consumeStats);
+ } else {
+ log.warn("getConsumeStats getResponseCommand failed, {} {}",
response.getCode(), response.getRemark());
+ future.completeExceptionally(new
MQClientException(response.getCode(), response.getRemark()));
+ }
+ });
+ return future;
+ }
+
+ @Override
+ public CompletableFuture<GroupList> queryTopicConsumeByWho(String address,
+ QueryTopicConsumeByWhoRequestHeader requestHeader, long timeoutMillis)
{
+ CompletableFuture<GroupList> future = new CompletableFuture<>();
+ RemotingCommand request =
RemotingCommand.createRequestCommand(RequestCode.QUERY_TOPIC_CONSUME_BY_WHO,
requestHeader);
+ remotingClient.invoke(address, request,
timeoutMillis).thenAccept(response -> {
+ if (response.getCode() == ResponseCode.SUCCESS) {
+ GroupList groupList = GroupList.decode(response.getBody(),
GroupList.class);
+ future.complete(groupList);
+ } else {
+ log.warn("queryTopicConsumeByWho getResponseCommand failed, {}
{}", response.getCode(), response.getRemark());
+ future.completeExceptionally(new
MQClientException(response.getCode(), response.getRemark()));
+ }
+ });
+ return future;
+ }
+
+ @Override
+ public CompletableFuture<ConsumerRunningInfo>
getConsumerRunningInfo(String address,
+ GetConsumerRunningInfoRequestHeader requestHeader, long timeoutMillis)
{
+ CompletableFuture<ConsumerRunningInfo> future = new
CompletableFuture<>();
+ RemotingCommand request =
RemotingCommand.createRequestCommand(RequestCode.GET_CONSUMER_RUNNING_INFO,
requestHeader);
+ remotingClient.invoke(address, request,
timeoutMillis).thenAccept(response -> {
+ if (response.getCode() == ResponseCode.SUCCESS) {
+ ConsumerRunningInfo info =
ConsumerRunningInfo.decode(response.getBody(), ConsumerRunningInfo.class);
+ future.complete(info);
+ } else {
+ log.warn("getConsumerRunningInfo getResponseCommand failed, {}
{}", response.getCode(), response.getRemark());
+ future.completeExceptionally(new
MQClientException(response.getCode(), response.getRemark()));
+ }
+ });
+ return future;
+ }
+
+ @Override
+ public CompletableFuture<ConsumeMessageDirectlyResult>
consumeMessageDirectly(String address,
+ ConsumeMessageDirectlyResultRequestHeader requestHeader, long
timeoutMillis) {
+ CompletableFuture<ConsumeMessageDirectlyResult> future = new
CompletableFuture<>();
+ RemotingCommand request =
RemotingCommand.createRequestCommand(RequestCode.CONSUME_MESSAGE_DIRECTLY,
requestHeader);
+ remotingClient.invoke(address, request,
timeoutMillis).thenAccept(response -> {
+ if (response.getCode() == ResponseCode.SUCCESS) {
+ ConsumeMessageDirectlyResult info =
ConsumeMessageDirectlyResult.decode(response.getBody(),
ConsumeMessageDirectlyResult.class);
+ future.complete(info);
+ } else {
+ log.warn("consumeMessageDirectly getResponseCommand failed, {}
{}", response.getCode(), response.getRemark());
+ future.completeExceptionally(new
MQClientException(response.getCode(), response.getRemark()));
+ }
+ });
+ return future;
+ }
+
+ private List<MessageExt> filterMessages(List<MessageExt> messageFoundList,
String topic, String key,
+ boolean uniqueKeyFlag) {
+ List<MessageExt> matchedMessages = new ArrayList<>();
+ if (uniqueKeyFlag) {
+ matchedMessages.addAll(messageFoundList.stream()
+ .filter(msg -> topic.equals(msg.getTopic()))
+ .filter(msg -> key.equals(msg.getMsgId()))
+ .collect(Collectors.toList())
+ );
+ } else {
+ matchedMessages.addAll(messageFoundList.stream()
+ .filter(msg -> topic.equals(msg.getTopic()))
+ .filter(msg -> {
+ boolean matched = false;
+ if (StringUtils.isNotBlank(msg.getKeys())) {
+ String[] keyArray =
msg.getKeys().split(MessageConst.KEY_SEPARATOR);
+ for (String s : keyArray) {
+ if (key.equals(s)) {
+ matched = true;
+ break;
+ }
+ }
+ }
+
+ return matched;
+ }).collect(Collectors.toList()));
+ }
+
+ return matchedMessages;
+ }
+}
diff --git
a/proxy/src/main/java/org/apache/rocketmq/proxy/service/mqclient/MQClientAPIExt.java
b/proxy/src/main/java/org/apache/rocketmq/proxy/service/mqclient/MQClientAPIExt.java
index ec81e815c..cc8252c2e 100644
---
a/proxy/src/main/java/org/apache/rocketmq/proxy/service/mqclient/MQClientAPIExt.java
+++
b/proxy/src/main/java/org/apache/rocketmq/proxy/service/mqclient/MQClientAPIExt.java
@@ -35,6 +35,7 @@ import
org.apache.rocketmq.client.exception.OffsetNotFoundException;
import org.apache.rocketmq.client.impl.ClientRemotingProcessor;
import org.apache.rocketmq.client.impl.CommunicationMode;
import org.apache.rocketmq.client.impl.MQClientAPIImpl;
+import org.apache.rocketmq.client.impl.admin.MqClientAdminImpl;
import org.apache.rocketmq.client.impl.consumer.PullResultExt;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.constant.LoggerName;
@@ -83,6 +84,8 @@ public class MQClientAPIExt extends MQClientAPIImpl {
private final ClientConfig clientConfig;
+ private MqClientAdminImpl mqClientAdmin;
+
public MQClientAPIExt(
ClientConfig clientConfig,
NettyClientConfig nettyClientConfig,
@@ -91,6 +94,7 @@ public class MQClientAPIExt extends MQClientAPIImpl {
) {
super(nettyClientConfig, clientRemotingProcessor, rpcHook,
clientConfig);
this.clientConfig = clientConfig;
+ this.mqClientAdmin = new MqClientAdminImpl(getRemotingClient());
}
public boolean updateNameServerAddressList() {
@@ -621,20 +625,7 @@ public class MQClientAPIExt extends MQClientAPIImpl {
}
public CompletableFuture<RemotingCommand> invoke(String brokerAddr,
RemotingCommand request, long timeoutMillis) {
- CompletableFuture<RemotingCommand> future = new CompletableFuture<>();
- try {
- this.getRemotingClient().invokeAsync(brokerAddr, request,
timeoutMillis, responseFuture -> {
- RemotingCommand response = responseFuture.getResponseCommand();
- if (response != null) {
- future.complete(response);
- } else {
-
future.completeExceptionally(processNullResponseErr(responseFuture));
- }
- });
- } catch (Exception e) {
- future.completeExceptionally(e);
- }
- return future;
+ return getRemotingClient().invoke(brokerAddr, request, timeoutMillis);
}
public CompletableFuture<Void> invokeOneway(String brokerAddr,
RemotingCommand request, long timeoutMillis) {
@@ -647,4 +638,8 @@ public class MQClientAPIExt extends MQClientAPIImpl {
}
return future;
}
+
+ public MqClientAdminImpl getMqClientAdmin() {
+ return mqClientAdmin;
+ }
}
diff --git
a/remoting/src/main/java/org/apache/rocketmq/remoting/RemotingClient.java
b/remoting/src/main/java/org/apache/rocketmq/remoting/RemotingClient.java
index 5c3766b2d..ff0b3df95 100644
--- a/remoting/src/main/java/org/apache/rocketmq/remoting/RemotingClient.java
+++ b/remoting/src/main/java/org/apache/rocketmq/remoting/RemotingClient.java
@@ -17,8 +17,10 @@
package org.apache.rocketmq.remoting;
import java.util.List;
+import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import org.apache.rocketmq.remoting.exception.RemotingConnectException;
+import org.apache.rocketmq.remoting.exception.RemotingException;
import org.apache.rocketmq.remoting.exception.RemotingSendRequestException;
import org.apache.rocketmq.remoting.exception.RemotingTimeoutException;
import org.apache.rocketmq.remoting.exception.RemotingTooMuchRequestException;
@@ -45,6 +47,30 @@ public interface RemotingClient extends RemotingService {
throws InterruptedException, RemotingConnectException,
RemotingTooMuchRequestException,
RemotingTimeoutException, RemotingSendRequestException;
+ default CompletableFuture<RemotingCommand> invoke(final String addr, final
RemotingCommand request,
+ final long timeoutMillis) {
+ CompletableFuture<RemotingCommand> future = new CompletableFuture<>();
+ try {
+ invokeAsync(addr, request, timeoutMillis, responseFuture -> {
+ RemotingCommand response = responseFuture.getResponseCommand();
+ if (response != null) {
+ future.complete(response);
+ } else {
+ if (!responseFuture.isSendRequestOK()) {
+ future.completeExceptionally(new
RemotingSendRequestException(addr, responseFuture.getCause()));
+ } else if (responseFuture.isTimeout()) {
+ future.completeExceptionally(new
RemotingTimeoutException(addr, timeoutMillis, responseFuture.getCause()));
+ } else {
+ future.completeExceptionally(new
RemotingException(request.toString(), responseFuture.getCause()));
+ }
+ }
+ });
+ } catch (Throwable t) {
+ future.completeExceptionally(t);
+ }
+ return future;
+ }
+
void registerProcessor(final int requestCode, final NettyRequestProcessor
processor,
final ExecutorService executor);
diff --git
a/remoting/src/test/java/org/apache/rocketmq/remoting/netty/NettyRemotingClientTest.java
b/remoting/src/test/java/org/apache/rocketmq/remoting/netty/NettyRemotingClientTest.java
index 4b38ce952..efa3eb3d5 100644
---
a/remoting/src/test/java/org/apache/rocketmq/remoting/netty/NettyRemotingClientTest.java
+++
b/remoting/src/test/java/org/apache/rocketmq/remoting/netty/NettyRemotingClientTest.java
@@ -16,23 +16,111 @@
*/
package org.apache.rocketmq.remoting.netty;
+import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
+import org.apache.rocketmq.remoting.InvokeCallback;
+import org.apache.rocketmq.remoting.exception.RemotingException;
+import org.apache.rocketmq.remoting.exception.RemotingSendRequestException;
+import org.apache.rocketmq.remoting.exception.RemotingTimeoutException;
+import org.apache.rocketmq.remoting.protocol.RemotingCommand;
+import org.apache.rocketmq.remoting.protocol.RequestCode;
+import org.apache.rocketmq.remoting.protocol.ResponseCode;
import org.junit.Test;
import org.junit.runner.RunWith;
+import org.mockito.Spy;
import org.mockito.junit.MockitoJUnitRunner;
import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.AssertionsForClassTypes.catchThrowable;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyLong;
+import static org.mockito.ArgumentMatchers.anyString;
+import static org.mockito.Mockito.doAnswer;
@RunWith(MockitoJUnitRunner.class)
public class NettyRemotingClientTest {
+ @Spy
private NettyRemotingClient remotingClient = new NettyRemotingClient(new
NettyClientConfig());
@Test
public void testSetCallbackExecutor() throws NoSuchFieldException,
IllegalAccessException {
ExecutorService customized = Executors.newCachedThreadPool();
remotingClient.setCallbackExecutor(customized);
-
assertThat(remotingClient.getCallbackExecutor()).isEqualTo(customized);
}
+
+ @Test
+ public void testInvokeResponse() throws Exception {
+ RemotingCommand request =
RemotingCommand.createRequestCommand(RequestCode.PULL_MESSAGE, null);
+
+ RemotingCommand response = RemotingCommand.createResponseCommand(null);
+ response.setCode(ResponseCode.SUCCESS);
+ doAnswer(invocation -> {
+ InvokeCallback callback = invocation.getArgument(3);
+ ResponseFuture responseFuture = new ResponseFuture(null,
request.getOpaque(), 3 * 1000, null, null);
+ responseFuture.setResponseCommand(response);
+ callback.operationComplete(responseFuture);
+ return null;
+ }).when(remotingClient).invokeAsync(anyString(),
any(RemotingCommand.class), anyLong(), any(InvokeCallback.class));
+
+ CompletableFuture<RemotingCommand> future =
remotingClient.invoke("0.0.0.0", request, 1000);
+ RemotingCommand actual = future.get();
+ assertThat(actual).isEqualTo(response);
+ }
+
+ @Test
+ public void testRemotingSendRequestException() throws Exception {
+ RemotingCommand request =
RemotingCommand.createRequestCommand(RequestCode.PULL_MESSAGE, null);
+
+ RemotingCommand response = RemotingCommand.createResponseCommand(null);
+ response.setCode(ResponseCode.SUCCESS);
+ doAnswer(invocation -> {
+ InvokeCallback callback = invocation.getArgument(3);
+ ResponseFuture responseFuture = new ResponseFuture(null,
request.getOpaque(), 3 * 1000, null, null);
+ responseFuture.setSendRequestOK(false);
+ callback.operationComplete(responseFuture);
+ return null;
+ }).when(remotingClient).invokeAsync(anyString(),
any(RemotingCommand.class), anyLong(), any(InvokeCallback.class));
+
+ CompletableFuture<RemotingCommand> future =
remotingClient.invoke("0.0.0.0", request, 1000);
+ Throwable thrown = catchThrowable(future::get);
+
assertThat(thrown.getCause()).isInstanceOf(RemotingSendRequestException.class);
+ }
+
+ @Test
+ public void testRemotingTimeoutException() throws Exception {
+ RemotingCommand request =
RemotingCommand.createRequestCommand(RequestCode.PULL_MESSAGE, null);
+
+ RemotingCommand response = RemotingCommand.createResponseCommand(null);
+ response.setCode(ResponseCode.SUCCESS);
+ doAnswer(invocation -> {
+ InvokeCallback callback = invocation.getArgument(3);
+ ResponseFuture responseFuture = new ResponseFuture(null,
request.getOpaque(), -1L, null, null);
+ callback.operationComplete(responseFuture);
+ return null;
+ }).when(remotingClient).invokeAsync(anyString(),
any(RemotingCommand.class), anyLong(), any(InvokeCallback.class));
+
+ CompletableFuture<RemotingCommand> future =
remotingClient.invoke("0.0.0.0", request, 1000);
+ Throwable thrown = catchThrowable(future::get);
+
assertThat(thrown.getCause()).isInstanceOf(RemotingTimeoutException.class);
+ }
+
+ @Test
+ public void testRemotingException() throws Exception {
+ RemotingCommand request =
RemotingCommand.createRequestCommand(RequestCode.PULL_MESSAGE, null);
+
+ RemotingCommand response = RemotingCommand.createResponseCommand(null);
+ response.setCode(ResponseCode.SUCCESS);
+ doAnswer(invocation -> {
+ InvokeCallback callback = invocation.getArgument(3);
+ ResponseFuture responseFuture = new ResponseFuture(null,
request.getOpaque(), 3 * 1000, null, null);
+ callback.operationComplete(responseFuture);
+ return null;
+ }).when(remotingClient).invokeAsync(anyString(),
any(RemotingCommand.class), anyLong(), any(InvokeCallback.class));
+
+ CompletableFuture<RemotingCommand> future =
remotingClient.invoke("0.0.0.0", request, 1000);
+ Throwable thrown = catchThrowable(future::get);
+ assertThat(thrown.getCause()).isInstanceOf(RemotingException.class);
+ }
}