This is an automated email from the ASF dual-hosted git repository.

zhouxzhan pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git


The following commit(s) were added to refs/heads/develop by this push:
     new 23a8ed490 [ISSUE #6644] Add admin client future interface (#6646)
23a8ed490 is described below

commit 23a8ed490fd8e6e960914b11af127dfce323fd22
Author: Zhouxiang Zhan <[email protected]>
AuthorDate: Fri Apr 28 17:34:16 2023 +0800

    [ISSUE #6644] Add admin client future interface (#6646)
    
    * [ISSUE #6644] Add admin client future interface
    
    * add unit test
    
    * Change exception to throwable
    
    * Add interface for MqClientAdmin
---
 .../org/apache/rocketmq/client/MqClientAdmin.java  | 110 ++++++
 .../client/impl/admin/MqClientAdminImpl.java       | 438 +++++++++++++++++++++
 .../proxy/service/mqclient/MQClientAPIExt.java     |  23 +-
 .../apache/rocketmq/remoting/RemotingClient.java   |  26 ++
 .../remoting/netty/NettyRemotingClientTest.java    |  90 ++++-
 5 files changed, 672 insertions(+), 15 deletions(-)

diff --git a/client/src/main/java/org/apache/rocketmq/client/MqClientAdmin.java 
b/client/src/main/java/org/apache/rocketmq/client/MqClientAdmin.java
new file mode 100644
index 000000000..4eb74c0ca
--- /dev/null
+++ b/client/src/main/java/org/apache/rocketmq/client/MqClientAdmin.java
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.client;
+
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import org.apache.rocketmq.common.message.MessageExt;
+import org.apache.rocketmq.common.message.MessageQueue;
+import org.apache.rocketmq.remoting.protocol.admin.ConsumeStats;
+import org.apache.rocketmq.remoting.protocol.admin.TopicStatsTable;
+import org.apache.rocketmq.remoting.protocol.body.ClusterInfo;
+import org.apache.rocketmq.remoting.protocol.body.ConsumeMessageDirectlyResult;
+import org.apache.rocketmq.remoting.protocol.body.ConsumerConnection;
+import org.apache.rocketmq.remoting.protocol.body.ConsumerRunningInfo;
+import org.apache.rocketmq.remoting.protocol.body.GroupList;
+import org.apache.rocketmq.remoting.protocol.body.QueueTimeSpan;
+import org.apache.rocketmq.remoting.protocol.body.TopicList;
+import 
org.apache.rocketmq.remoting.protocol.header.ConsumeMessageDirectlyResultRequestHeader;
+import org.apache.rocketmq.remoting.protocol.header.CreateTopicRequestHeader;
+import 
org.apache.rocketmq.remoting.protocol.header.DeleteSubscriptionGroupRequestHeader;
+import org.apache.rocketmq.remoting.protocol.header.DeleteTopicRequestHeader;
+import 
org.apache.rocketmq.remoting.protocol.header.GetConsumeStatsRequestHeader;
+import 
org.apache.rocketmq.remoting.protocol.header.GetConsumerConnectionListRequestHeader;
+import 
org.apache.rocketmq.remoting.protocol.header.GetConsumerRunningInfoRequestHeader;
+import 
org.apache.rocketmq.remoting.protocol.header.GetTopicStatsInfoRequestHeader;
+import 
org.apache.rocketmq.remoting.protocol.header.QueryConsumeTimeSpanRequestHeader;
+import org.apache.rocketmq.remoting.protocol.header.QueryMessageRequestHeader;
+import 
org.apache.rocketmq.remoting.protocol.header.QuerySubscriptionByConsumerRequestHeader;
+import 
org.apache.rocketmq.remoting.protocol.header.QueryTopicConsumeByWhoRequestHeader;
+import 
org.apache.rocketmq.remoting.protocol.header.QueryTopicsByConsumerRequestHeader;
+import org.apache.rocketmq.remoting.protocol.header.ResetOffsetRequestHeader;
+import org.apache.rocketmq.remoting.protocol.header.ViewMessageRequestHeader;
+import 
org.apache.rocketmq.remoting.protocol.header.namesrv.DeleteKVConfigRequestHeader;
+import 
org.apache.rocketmq.remoting.protocol.header.namesrv.DeleteTopicFromNamesrvRequestHeader;
+import org.apache.rocketmq.remoting.protocol.heartbeat.SubscriptionData;
+import 
org.apache.rocketmq.remoting.protocol.subscription.SubscriptionGroupConfig;
+
+public interface MqClientAdmin {
+    CompletableFuture<List<MessageExt>> queryMessage(String address, boolean 
uniqueKeyFlag, boolean decompressBody,
+        QueryMessageRequestHeader requestHeader, long timeoutMillis);
+
+    CompletableFuture<TopicStatsTable> getTopicStatsInfo(String address,
+        GetTopicStatsInfoRequestHeader requestHeader, long timeoutMillis);
+
+    CompletableFuture<List<QueueTimeSpan>> queryConsumeTimeSpan(String address,
+        QueryConsumeTimeSpanRequestHeader requestHeader, long timeoutMillis);
+
+    CompletableFuture<Void> updateOrCreateTopic(String address, 
CreateTopicRequestHeader requestHeader,
+        long timeoutMillis);
+
+    CompletableFuture<Void> updateOrCreateSubscriptionGroup(String address, 
SubscriptionGroupConfig config,
+        long timeoutMillis);
+
+    CompletableFuture<Void> deleteTopicInBroker(String address, 
DeleteTopicRequestHeader requestHeader,
+        long timeoutMillis);
+
+    CompletableFuture<Void> deleteTopicInNameserver(String address, 
DeleteTopicFromNamesrvRequestHeader requestHeader,
+        long timeoutMillis);
+
+    CompletableFuture<Void> deleteKvConfig(String address, 
DeleteKVConfigRequestHeader requestHeader,
+        long timeoutMillis);
+
+    CompletableFuture<Void> deleteSubscriptionGroup(String address, 
DeleteSubscriptionGroupRequestHeader requestHeader,
+        long timeoutMillis);
+
+    CompletableFuture<Map<MessageQueue, Long>> 
invokeBrokerToResetOffset(String address,
+        ResetOffsetRequestHeader requestHeader, long timeoutMillis);
+
+    CompletableFuture<MessageExt> viewMessage(String address, 
ViewMessageRequestHeader requestHeader,
+        long timeoutMillis);
+
+    CompletableFuture<ClusterInfo> getBrokerClusterInfo(String address, long 
timeoutMillis);
+
+    CompletableFuture<ConsumerConnection> getConsumerConnectionList(String 
address,
+        GetConsumerConnectionListRequestHeader requestHeader, long 
timeoutMillis);
+
+    CompletableFuture<TopicList> queryTopicsByConsumer(String address,
+        QueryTopicsByConsumerRequestHeader requestHeader, long timeoutMillis);
+
+    CompletableFuture<SubscriptionData> querySubscriptionByConsumer(String 
address,
+        QuerySubscriptionByConsumerRequestHeader requestHeader, long 
timeoutMillis);
+
+    CompletableFuture<ConsumeStats> getConsumeStats(String address, 
GetConsumeStatsRequestHeader requestHeader,
+        long timeoutMillis);
+
+    CompletableFuture<GroupList> queryTopicConsumeByWho(String address,
+        QueryTopicConsumeByWhoRequestHeader requestHeader, long timeoutMillis);
+
+    CompletableFuture<ConsumerRunningInfo> getConsumerRunningInfo(String 
address,
+        GetConsumerRunningInfoRequestHeader requestHeader, long timeoutMillis);
+
+    CompletableFuture<ConsumeMessageDirectlyResult> 
consumeMessageDirectly(String address,
+        ConsumeMessageDirectlyResultRequestHeader requestHeader, long 
timeoutMillis);
+}
diff --git 
a/client/src/main/java/org/apache/rocketmq/client/impl/admin/MqClientAdminImpl.java
 
b/client/src/main/java/org/apache/rocketmq/client/impl/admin/MqClientAdminImpl.java
new file mode 100644
index 000000000..34f066c7d
--- /dev/null
+++ 
b/client/src/main/java/org/apache/rocketmq/client/impl/admin/MqClientAdminImpl.java
@@ -0,0 +1,438 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.client.impl.admin;
+
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.stream.Collectors;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.rocketmq.client.MqClientAdmin;
+import org.apache.rocketmq.client.exception.MQClientException;
+import org.apache.rocketmq.common.MixAll;
+import org.apache.rocketmq.common.message.MessageConst;
+import org.apache.rocketmq.common.message.MessageDecoder;
+import org.apache.rocketmq.common.message.MessageExt;
+import org.apache.rocketmq.common.message.MessageQueue;
+import org.apache.rocketmq.logging.org.slf4j.Logger;
+import org.apache.rocketmq.logging.org.slf4j.LoggerFactory;
+import org.apache.rocketmq.remoting.RemotingClient;
+import org.apache.rocketmq.remoting.protocol.RemotingCommand;
+import org.apache.rocketmq.remoting.protocol.RemotingSerializable;
+import org.apache.rocketmq.remoting.protocol.RequestCode;
+import org.apache.rocketmq.remoting.protocol.ResponseCode;
+import org.apache.rocketmq.remoting.protocol.admin.ConsumeStats;
+import org.apache.rocketmq.remoting.protocol.admin.TopicStatsTable;
+import org.apache.rocketmq.remoting.protocol.body.ClusterInfo;
+import org.apache.rocketmq.remoting.protocol.body.ConsumeMessageDirectlyResult;
+import org.apache.rocketmq.remoting.protocol.body.ConsumerConnection;
+import org.apache.rocketmq.remoting.protocol.body.ConsumerRunningInfo;
+import org.apache.rocketmq.remoting.protocol.body.GroupList;
+import org.apache.rocketmq.remoting.protocol.body.QueryConsumeTimeSpanBody;
+import 
org.apache.rocketmq.remoting.protocol.body.QuerySubscriptionResponseBody;
+import org.apache.rocketmq.remoting.protocol.body.QueueTimeSpan;
+import org.apache.rocketmq.remoting.protocol.body.ResetOffsetBody;
+import org.apache.rocketmq.remoting.protocol.body.TopicList;
+import 
org.apache.rocketmq.remoting.protocol.header.ConsumeMessageDirectlyResultRequestHeader;
+import org.apache.rocketmq.remoting.protocol.header.CreateTopicRequestHeader;
+import 
org.apache.rocketmq.remoting.protocol.header.DeleteSubscriptionGroupRequestHeader;
+import org.apache.rocketmq.remoting.protocol.header.DeleteTopicRequestHeader;
+import 
org.apache.rocketmq.remoting.protocol.header.GetConsumeStatsRequestHeader;
+import 
org.apache.rocketmq.remoting.protocol.header.GetConsumerConnectionListRequestHeader;
+import 
org.apache.rocketmq.remoting.protocol.header.GetConsumerRunningInfoRequestHeader;
+import 
org.apache.rocketmq.remoting.protocol.header.GetTopicStatsInfoRequestHeader;
+import 
org.apache.rocketmq.remoting.protocol.header.QueryConsumeTimeSpanRequestHeader;
+import org.apache.rocketmq.remoting.protocol.header.QueryMessageRequestHeader;
+import 
org.apache.rocketmq.remoting.protocol.header.QuerySubscriptionByConsumerRequestHeader;
+import 
org.apache.rocketmq.remoting.protocol.header.QueryTopicConsumeByWhoRequestHeader;
+import 
org.apache.rocketmq.remoting.protocol.header.QueryTopicsByConsumerRequestHeader;
+import org.apache.rocketmq.remoting.protocol.header.ResetOffsetRequestHeader;
+import org.apache.rocketmq.remoting.protocol.header.ViewMessageRequestHeader;
+import 
org.apache.rocketmq.remoting.protocol.header.namesrv.DeleteKVConfigRequestHeader;
+import 
org.apache.rocketmq.remoting.protocol.header.namesrv.DeleteTopicFromNamesrvRequestHeader;
+import org.apache.rocketmq.remoting.protocol.heartbeat.SubscriptionData;
+import 
org.apache.rocketmq.remoting.protocol.subscription.SubscriptionGroupConfig;
+
+public class MqClientAdminImpl implements MqClientAdmin {
+    private final static Logger log = 
LoggerFactory.getLogger(MqClientAdminImpl.class);
+    private final RemotingClient remotingClient;
+
+    public MqClientAdminImpl(RemotingClient remotingClient) {
+        this.remotingClient = remotingClient;
+    }
+
+    @Override
+    public CompletableFuture<List<MessageExt>> queryMessage(String address, 
boolean uniqueKeyFlag, boolean decompressBody,
+        QueryMessageRequestHeader requestHeader, long timeoutMillis) {
+        CompletableFuture<List<MessageExt>> future = new CompletableFuture<>();
+        RemotingCommand request = 
RemotingCommand.createRequestCommand(RequestCode.QUERY_MESSAGE, requestHeader);
+        request.addExtField(MixAll.UNIQUE_MSG_QUERY_FLAG, 
String.valueOf(uniqueKeyFlag));
+        remotingClient.invoke(address, request, 
timeoutMillis).thenAccept(response -> {
+            if (response.getCode() == ResponseCode.SUCCESS) {
+                List<MessageExt> wrappers = 
MessageDecoder.decodesBatch(ByteBuffer.wrap(response.getBody()), true, 
decompressBody, true);
+                future.complete(filterMessages(wrappers, 
requestHeader.getTopic(), requestHeader.getKey(), uniqueKeyFlag));
+            } else if (response.getCode() == ResponseCode.QUERY_NOT_FOUND)  {
+                List<MessageExt> wrappers = new ArrayList<>();
+                future.complete(wrappers);
+            } else {
+                log.warn("queryMessage getResponseCommand failed, {} {}, 
header={}", response.getCode(), response.getRemark(), requestHeader);
+                future.completeExceptionally(new 
MQClientException(response.getCode(), response.getRemark()));
+            }
+        });
+
+        return future;
+    }
+
+    @Override
+    public CompletableFuture<TopicStatsTable> getTopicStatsInfo(String address,
+        GetTopicStatsInfoRequestHeader requestHeader, long timeoutMillis) {
+        CompletableFuture<TopicStatsTable> future = new CompletableFuture<>();
+        RemotingCommand request = 
RemotingCommand.createRequestCommand(RequestCode.GET_TOPIC_STATS_INFO, 
requestHeader);
+        remotingClient.invoke(address, request, 
timeoutMillis).thenAccept(response -> {
+            if (response.getCode() == ResponseCode.SUCCESS) {
+                TopicStatsTable topicStatsTable = 
TopicStatsTable.decode(response.getBody(), TopicStatsTable.class);
+                future.complete(topicStatsTable);
+            } else {
+                log.warn("getTopicStatsInfo getResponseCommand failed, {} {}, 
header={}", response.getCode(), response.getRemark(), requestHeader);
+                future.completeExceptionally(new 
MQClientException(response.getCode(), response.getRemark()));
+            }
+        });
+        return future;
+    }
+
+    @Override
+    public CompletableFuture<List<QueueTimeSpan>> queryConsumeTimeSpan(String 
address,
+        QueryConsumeTimeSpanRequestHeader requestHeader, long timeoutMillis) {
+        CompletableFuture<List<QueueTimeSpan>> future = new 
CompletableFuture<>();
+        RemotingCommand request = 
RemotingCommand.createRequestCommand(RequestCode.QUERY_CONSUME_TIME_SPAN, 
requestHeader);
+        remotingClient.invoke(address, request, 
timeoutMillis).thenAccept(response -> {
+            if (response.getCode() == ResponseCode.SUCCESS) {
+                QueryConsumeTimeSpanBody consumeTimeSpanBody = 
GroupList.decode(response.getBody(), QueryConsumeTimeSpanBody.class);
+                future.complete(consumeTimeSpanBody.getConsumeTimeSpanSet());
+            } else {
+                log.warn("queryConsumerTimeSpan getResponseCommand failed, {} 
{}, header={}", response.getCode(), response.getRemark(), requestHeader);
+                future.completeExceptionally(new 
MQClientException(response.getCode(), response.getRemark()));
+            }
+        });
+        return future;
+    }
+
+    @Override
+    public CompletableFuture<Void> updateOrCreateTopic(String address, 
CreateTopicRequestHeader requestHeader,
+        long timeoutMillis) {
+        CompletableFuture<Void> future = new CompletableFuture<>();
+        RemotingCommand request = 
RemotingCommand.createRequestCommand(RequestCode.UPDATE_AND_CREATE_TOPIC, 
requestHeader);
+        remotingClient.invoke(address, request, 
timeoutMillis).thenAccept(response -> {
+            if (response.getCode() == ResponseCode.SUCCESS) {
+                future.complete(null);
+            } else {
+                log.warn("updateOrCreateTopic getResponseCommand failed, {} 
{}, header={}", response.getCode(), response.getRemark(), requestHeader);
+                future.completeExceptionally(new 
MQClientException(response.getCode(), response.getRemark()));
+            }
+        });
+        return future;
+    }
+
+    @Override
+    public CompletableFuture<Void> updateOrCreateSubscriptionGroup(String 
address, SubscriptionGroupConfig config,
+        long timeoutMillis) {
+        CompletableFuture<Void> future = new CompletableFuture<>();
+        RemotingCommand request = 
RemotingCommand.createRequestCommand(RequestCode.UPDATE_AND_CREATE_SUBSCRIPTIONGROUP,
 null);
+        byte[] body = RemotingSerializable.encode(config);
+        request.setBody(body);
+        remotingClient.invoke(address, request, 
timeoutMillis).thenAccept(response -> {
+            if (response.getCode() == ResponseCode.SUCCESS) {
+                future.complete(null);
+            } else {
+                log.warn("updateOrCreateSubscriptionGroup getResponseCommand 
failed, {} {}, header={}", response.getCode(), response.getRemark(), config);
+                future.completeExceptionally(new 
MQClientException(response.getCode(), response.getRemark()));
+            }
+        });
+        return future;
+    }
+
+    @Override
+    public CompletableFuture<Void> deleteTopicInBroker(String address, 
DeleteTopicRequestHeader requestHeader,
+        long timeoutMillis) {
+        CompletableFuture<Void> future = new CompletableFuture<>();
+        RemotingCommand request = 
RemotingCommand.createRequestCommand(RequestCode.DELETE_TOPIC_IN_BROKER, 
requestHeader);
+        remotingClient.invoke(address, request, 
timeoutMillis).thenAccept(response -> {
+            if (response.getCode() == ResponseCode.SUCCESS) {
+                future.complete(null);
+            } else {
+                log.warn("deleteTopicInBroker getResponseCommand failed, {} 
{}, header={}", response.getCode(), response.getRemark(), requestHeader);
+                future.completeExceptionally(new 
MQClientException(response.getCode(), response.getRemark()));
+            }
+        });
+        return future;
+    }
+
+    @Override
+    public CompletableFuture<Void> deleteTopicInNameserver(String address, 
DeleteTopicFromNamesrvRequestHeader requestHeader,
+        long timeoutMillis) {
+        CompletableFuture<Void> future = new CompletableFuture<>();
+        RemotingCommand request = 
RemotingCommand.createRequestCommand(RequestCode.DELETE_TOPIC_IN_NAMESRV, 
requestHeader);
+        remotingClient.invoke(address, request, 
timeoutMillis).thenAccept(response -> {
+            if (response.getCode() == ResponseCode.SUCCESS) {
+                future.complete(null);
+            } else {
+                log.warn("deleteTopicInNameserver getResponseCommand failed, 
{} {}, header={}", response.getCode(), response.getRemark(), requestHeader);
+                future.completeExceptionally(new 
MQClientException(response.getCode(), response.getRemark()));
+            }
+        });
+        return future;
+    }
+
+    @Override
+    public CompletableFuture<Void> deleteKvConfig(String address, 
DeleteKVConfigRequestHeader requestHeader,
+        long timeoutMillis) {
+        CompletableFuture<Void> future = new CompletableFuture<>();
+        RemotingCommand request = 
RemotingCommand.createRequestCommand(RequestCode.DELETE_KV_CONFIG, 
requestHeader);
+        remotingClient.invoke(address, request, 
timeoutMillis).thenAccept(response -> {
+            if (response.getCode() == ResponseCode.SUCCESS) {
+                future.complete(null);
+            } else {
+                log.warn("deleteKvConfig getResponseCommand failed, {} {}, 
header={}", response.getCode(), response.getRemark(), requestHeader);
+                future.completeExceptionally(new 
MQClientException(response.getCode(), response.getRemark()));
+            }
+        });
+        return future;
+    }
+
+    @Override
+    public CompletableFuture<Void> deleteSubscriptionGroup(String address, 
DeleteSubscriptionGroupRequestHeader requestHeader,
+        long timeoutMillis) {
+        CompletableFuture<Void> future = new CompletableFuture<>();
+        RemotingCommand request = 
RemotingCommand.createRequestCommand(RequestCode.DELETE_SUBSCRIPTIONGROUP, 
requestHeader);
+        remotingClient.invoke(address, request, 
timeoutMillis).thenAccept(response -> {
+            if (response.getCode() == ResponseCode.SUCCESS) {
+                future.complete(null);
+            } else {
+                log.warn("deleteSubscriptionGroup getResponseCommand failed, 
{} {}, header={}", response.getCode(), response.getRemark(), requestHeader);
+                future.completeExceptionally(new 
MQClientException(response.getCode(), response.getRemark()));
+            }
+        });
+        return future;
+    }
+
+    @Override
+    public CompletableFuture<Map<MessageQueue, Long>> 
invokeBrokerToResetOffset(String address,
+        ResetOffsetRequestHeader requestHeader, long timeoutMillis) {
+        CompletableFuture<Map<MessageQueue, Long>> future = new 
CompletableFuture<>();
+        RemotingCommand request = 
RemotingCommand.createRequestCommand(RequestCode.INVOKE_BROKER_TO_RESET_OFFSET, 
requestHeader);
+        remotingClient.invoke(address, request, 
timeoutMillis).thenAccept(response -> {
+            if (response.getCode() == ResponseCode.SUCCESS && null != 
response.getBody()) {
+                Map<MessageQueue, Long> offsetTable = 
ResetOffsetBody.decode(response.getBody(), 
ResetOffsetBody.class).getOffsetTable();
+                future.complete(offsetTable);
+                log.info("Invoke broker to reset offset success. address:{}, 
header:{}, offsetTable:{}",
+                    address, requestHeader, offsetTable);
+            } else {
+                log.warn("invokeBrokerToResetOffset getResponseCommand failed, 
{} {}, header={}", response.getCode(), response.getRemark(), requestHeader);
+                future.completeExceptionally(new 
MQClientException(response.getCode(), response.getRemark()));
+            }
+        });
+        return future;
+    }
+
+    @Override
+    public CompletableFuture<MessageExt> viewMessage(String address, 
ViewMessageRequestHeader requestHeader,
+        long timeoutMillis) {
+        CompletableFuture<MessageExt> future = new CompletableFuture<>();
+        RemotingCommand request = 
RemotingCommand.createRequestCommand(RequestCode.VIEW_MESSAGE_BY_ID, 
requestHeader);
+        remotingClient.invoke(address, request, 
timeoutMillis).thenAccept(response -> {
+            if (response.getCode() == ResponseCode.SUCCESS) {
+                ByteBuffer byteBuffer = ByteBuffer.wrap(response.getBody());
+                MessageExt messageExt = 
MessageDecoder.clientDecode(byteBuffer, true);
+                future.complete(messageExt);
+            } else {
+                log.warn("viewMessage getResponseCommand failed, {} {}, 
header={}", response.getCode(), response.getRemark(), requestHeader);
+                future.completeExceptionally(new 
MQClientException(response.getCode(), response.getRemark()));
+            }
+        });
+        return future;
+    }
+
+    @Override
+    public CompletableFuture<ClusterInfo> getBrokerClusterInfo(String address, 
long timeoutMillis) {
+        CompletableFuture<ClusterInfo> future = new CompletableFuture<>();
+        RemotingCommand request = 
RemotingCommand.createRequestCommand(RequestCode.GET_BROKER_CLUSTER_INFO, null);
+        remotingClient.invoke(address, request, 
timeoutMillis).thenAccept(response -> {
+            if (response.getCode() == ResponseCode.SUCCESS) {
+                ClusterInfo clusterInfo = 
ClusterInfo.decode(response.getBody(), ClusterInfo.class);
+                future.complete(clusterInfo);
+            } else {
+                log.warn("getBrokerClusterInfo getResponseCommand failed, {} 
{}", response.getCode(), response.getRemark());
+                future.completeExceptionally(new 
MQClientException(response.getCode(), response.getRemark()));
+            }
+        });
+        return future;
+    }
+
+    @Override
+    public CompletableFuture<ConsumerConnection> 
getConsumerConnectionList(String address,
+        GetConsumerConnectionListRequestHeader requestHeader, long 
timeoutMillis) {
+        CompletableFuture<ConsumerConnection> future = new 
CompletableFuture<>();
+        RemotingCommand request = 
RemotingCommand.createRequestCommand(RequestCode.GET_CONSUMER_CONNECTION_LIST, 
requestHeader);
+        remotingClient.invoke(address, request, 
timeoutMillis).thenAccept(response -> {
+            if (response.getCode() == ResponseCode.SUCCESS) {
+                ConsumerConnection consumerConnection = 
ConsumerConnection.decode(response.getBody(), ConsumerConnection.class);
+                future.complete(consumerConnection);
+            } else {
+                log.warn("getConsumerConnectionList getResponseCommand failed, 
{} {}", response.getCode(), response.getRemark());
+                future.completeExceptionally(new 
MQClientException(response.getCode(), response.getRemark()));
+            }
+        });
+        return future;
+    }
+
+    @Override
+    public CompletableFuture<TopicList> queryTopicsByConsumer(String address,
+        QueryTopicsByConsumerRequestHeader requestHeader, long timeoutMillis) {
+        CompletableFuture<TopicList> future = new CompletableFuture<>();
+        RemotingCommand request = 
RemotingCommand.createRequestCommand(RequestCode.QUERY_TOPICS_BY_CONSUMER, 
requestHeader);
+        remotingClient.invoke(address, request, 
timeoutMillis).thenAccept(response -> {
+            if (response.getCode() == ResponseCode.SUCCESS) {
+                TopicList topicList = TopicList.decode(response.getBody(), 
TopicList.class);
+                future.complete(topicList);
+            } else {
+                log.warn("queryTopicsByConsumer getResponseCommand failed, {} 
{}", response.getCode(), response.getRemark());
+                future.completeExceptionally(new 
MQClientException(response.getCode(), response.getRemark()));
+            }
+        });
+        return future;
+    }
+
+    @Override
+    public CompletableFuture<SubscriptionData> 
querySubscriptionByConsumer(String address,
+        QuerySubscriptionByConsumerRequestHeader requestHeader, long 
timeoutMillis) {
+        CompletableFuture<SubscriptionData> future = new CompletableFuture<>();
+        RemotingCommand request = 
RemotingCommand.createRequestCommand(RequestCode.QUERY_SUBSCRIPTION_BY_CONSUMER,
 requestHeader);
+        remotingClient.invoke(address, request, 
timeoutMillis).thenAccept(response -> {
+            if (response.getCode() == ResponseCode.SUCCESS) {
+                QuerySubscriptionResponseBody subscriptionResponseBody =
+                    QuerySubscriptionResponseBody.decode(response.getBody(), 
QuerySubscriptionResponseBody.class);
+                
future.complete(subscriptionResponseBody.getSubscriptionData());
+            } else {
+                log.warn("querySubscriptionByConsumer getResponseCommand 
failed, {} {}", response.getCode(), response.getRemark());
+                future.completeExceptionally(new 
MQClientException(response.getCode(), response.getRemark()));
+            }
+        });
+        return future;
+    }
+
+    @Override
+    public CompletableFuture<ConsumeStats> getConsumeStats(String address, 
GetConsumeStatsRequestHeader requestHeader,
+        long timeoutMillis) {
+        CompletableFuture<ConsumeStats> future = new CompletableFuture<>();
+        RemotingCommand request = 
RemotingCommand.createRequestCommand(RequestCode.GET_CONSUME_STATS, 
requestHeader);
+        remotingClient.invoke(address, request, 
timeoutMillis).thenAccept(response -> {
+            if (response.getCode() == ResponseCode.SUCCESS) {
+                ConsumeStats consumeStats = 
ConsumeStats.decode(response.getBody(), ConsumeStats.class);
+                future.complete(consumeStats);
+            } else {
+                log.warn("getConsumeStats getResponseCommand failed, {} {}", 
response.getCode(), response.getRemark());
+                future.completeExceptionally(new 
MQClientException(response.getCode(), response.getRemark()));
+            }
+        });
+        return future;
+    }
+
+    @Override
+    public CompletableFuture<GroupList> queryTopicConsumeByWho(String address,
+        QueryTopicConsumeByWhoRequestHeader requestHeader, long timeoutMillis) 
{
+        CompletableFuture<GroupList> future = new CompletableFuture<>();
+        RemotingCommand request = 
RemotingCommand.createRequestCommand(RequestCode.QUERY_TOPIC_CONSUME_BY_WHO, 
requestHeader);
+        remotingClient.invoke(address, request, 
timeoutMillis).thenAccept(response -> {
+            if (response.getCode() == ResponseCode.SUCCESS) {
+                GroupList groupList = GroupList.decode(response.getBody(), 
GroupList.class);
+                future.complete(groupList);
+            } else {
+                log.warn("queryTopicConsumeByWho getResponseCommand failed, {} 
{}", response.getCode(), response.getRemark());
+                future.completeExceptionally(new 
MQClientException(response.getCode(), response.getRemark()));
+            }
+        });
+        return future;
+    }
+
+    @Override
+    public CompletableFuture<ConsumerRunningInfo> 
getConsumerRunningInfo(String address,
+        GetConsumerRunningInfoRequestHeader requestHeader, long timeoutMillis) 
{
+        CompletableFuture<ConsumerRunningInfo> future = new 
CompletableFuture<>();
+        RemotingCommand request = 
RemotingCommand.createRequestCommand(RequestCode.GET_CONSUMER_RUNNING_INFO, 
requestHeader);
+        remotingClient.invoke(address, request, 
timeoutMillis).thenAccept(response -> {
+            if (response.getCode() == ResponseCode.SUCCESS) {
+                ConsumerRunningInfo info = 
ConsumerRunningInfo.decode(response.getBody(), ConsumerRunningInfo.class);
+                future.complete(info);
+            } else {
+                log.warn("getConsumerRunningInfo getResponseCommand failed, {} 
{}", response.getCode(), response.getRemark());
+                future.completeExceptionally(new 
MQClientException(response.getCode(), response.getRemark()));
+            }
+        });
+        return future;
+    }
+
+    @Override
+    public CompletableFuture<ConsumeMessageDirectlyResult> 
consumeMessageDirectly(String address,
+        ConsumeMessageDirectlyResultRequestHeader requestHeader, long 
timeoutMillis) {
+        CompletableFuture<ConsumeMessageDirectlyResult> future = new 
CompletableFuture<>();
+        RemotingCommand request = 
RemotingCommand.createRequestCommand(RequestCode.CONSUME_MESSAGE_DIRECTLY, 
requestHeader);
+        remotingClient.invoke(address, request, 
timeoutMillis).thenAccept(response -> {
+            if (response.getCode() == ResponseCode.SUCCESS) {
+                ConsumeMessageDirectlyResult info = 
ConsumeMessageDirectlyResult.decode(response.getBody(), 
ConsumeMessageDirectlyResult.class);
+                future.complete(info);
+            } else {
+                log.warn("consumeMessageDirectly getResponseCommand failed, {} 
{}", response.getCode(), response.getRemark());
+                future.completeExceptionally(new 
MQClientException(response.getCode(), response.getRemark()));
+            }
+        });
+        return future;
+    }
+
+    private List<MessageExt> filterMessages(List<MessageExt> messageFoundList, 
String topic, String key,
+        boolean uniqueKeyFlag) {
+        List<MessageExt> matchedMessages = new ArrayList<>();
+        if (uniqueKeyFlag) {
+            matchedMessages.addAll(messageFoundList.stream()
+                .filter(msg -> topic.equals(msg.getTopic()))
+                .filter(msg -> key.equals(msg.getMsgId()))
+                .collect(Collectors.toList())
+            );
+        } else {
+            matchedMessages.addAll(messageFoundList.stream()
+                .filter(msg -> topic.equals(msg.getTopic()))
+                .filter(msg -> {
+                    boolean matched = false;
+                    if (StringUtils.isNotBlank(msg.getKeys())) {
+                        String[] keyArray = 
msg.getKeys().split(MessageConst.KEY_SEPARATOR);
+                        for (String s : keyArray) {
+                            if (key.equals(s)) {
+                                matched = true;
+                                break;
+                            }
+                        }
+                    }
+
+                    return matched;
+                }).collect(Collectors.toList()));
+        }
+
+        return matchedMessages;
+    }
+}
diff --git 
a/proxy/src/main/java/org/apache/rocketmq/proxy/service/mqclient/MQClientAPIExt.java
 
b/proxy/src/main/java/org/apache/rocketmq/proxy/service/mqclient/MQClientAPIExt.java
index ec81e815c..cc8252c2e 100644
--- 
a/proxy/src/main/java/org/apache/rocketmq/proxy/service/mqclient/MQClientAPIExt.java
+++ 
b/proxy/src/main/java/org/apache/rocketmq/proxy/service/mqclient/MQClientAPIExt.java
@@ -35,6 +35,7 @@ import 
org.apache.rocketmq.client.exception.OffsetNotFoundException;
 import org.apache.rocketmq.client.impl.ClientRemotingProcessor;
 import org.apache.rocketmq.client.impl.CommunicationMode;
 import org.apache.rocketmq.client.impl.MQClientAPIImpl;
+import org.apache.rocketmq.client.impl.admin.MqClientAdminImpl;
 import org.apache.rocketmq.client.impl.consumer.PullResultExt;
 import org.apache.rocketmq.client.producer.SendResult;
 import org.apache.rocketmq.common.constant.LoggerName;
@@ -83,6 +84,8 @@ public class MQClientAPIExt extends MQClientAPIImpl {
 
     private final ClientConfig clientConfig;
 
+    private MqClientAdminImpl mqClientAdmin;
+
     public MQClientAPIExt(
         ClientConfig clientConfig,
         NettyClientConfig nettyClientConfig,
@@ -91,6 +94,7 @@ public class MQClientAPIExt extends MQClientAPIImpl {
     ) {
         super(nettyClientConfig, clientRemotingProcessor, rpcHook, 
clientConfig);
         this.clientConfig = clientConfig;
+        this.mqClientAdmin = new MqClientAdminImpl(getRemotingClient());
     }
 
     public boolean updateNameServerAddressList() {
@@ -621,20 +625,7 @@ public class MQClientAPIExt extends MQClientAPIImpl {
     }
 
     public CompletableFuture<RemotingCommand> invoke(String brokerAddr, 
RemotingCommand request, long timeoutMillis) {
-        CompletableFuture<RemotingCommand> future = new CompletableFuture<>();
-        try {
-            this.getRemotingClient().invokeAsync(brokerAddr, request, 
timeoutMillis, responseFuture -> {
-                RemotingCommand response = responseFuture.getResponseCommand();
-                if (response != null) {
-                    future.complete(response);
-                } else {
-                    
future.completeExceptionally(processNullResponseErr(responseFuture));
-                }
-            });
-        } catch (Exception e) {
-            future.completeExceptionally(e);
-        }
-        return future;
+        return getRemotingClient().invoke(brokerAddr, request, timeoutMillis);
     }
 
     public CompletableFuture<Void> invokeOneway(String brokerAddr, 
RemotingCommand request, long timeoutMillis) {
@@ -647,4 +638,8 @@ public class MQClientAPIExt extends MQClientAPIImpl {
         }
         return future;
     }
+
+    public MqClientAdminImpl getMqClientAdmin() {
+        return mqClientAdmin;
+    }
 }
diff --git 
a/remoting/src/main/java/org/apache/rocketmq/remoting/RemotingClient.java 
b/remoting/src/main/java/org/apache/rocketmq/remoting/RemotingClient.java
index 5c3766b2d..ff0b3df95 100644
--- a/remoting/src/main/java/org/apache/rocketmq/remoting/RemotingClient.java
+++ b/remoting/src/main/java/org/apache/rocketmq/remoting/RemotingClient.java
@@ -17,8 +17,10 @@
 package org.apache.rocketmq.remoting;
 
 import java.util.List;
+import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutorService;
 import org.apache.rocketmq.remoting.exception.RemotingConnectException;
+import org.apache.rocketmq.remoting.exception.RemotingException;
 import org.apache.rocketmq.remoting.exception.RemotingSendRequestException;
 import org.apache.rocketmq.remoting.exception.RemotingTimeoutException;
 import org.apache.rocketmq.remoting.exception.RemotingTooMuchRequestException;
@@ -45,6 +47,30 @@ public interface RemotingClient extends RemotingService {
         throws InterruptedException, RemotingConnectException, 
RemotingTooMuchRequestException,
         RemotingTimeoutException, RemotingSendRequestException;
 
+    default CompletableFuture<RemotingCommand> invoke(final String addr, final 
RemotingCommand request,
+        final long timeoutMillis) {
+        CompletableFuture<RemotingCommand> future = new CompletableFuture<>();
+        try {
+            invokeAsync(addr, request, timeoutMillis, responseFuture -> {
+                RemotingCommand response = responseFuture.getResponseCommand();
+                if (response != null) {
+                    future.complete(response);
+                } else {
+                    if (!responseFuture.isSendRequestOK()) {
+                        future.completeExceptionally(new 
RemotingSendRequestException(addr, responseFuture.getCause()));
+                    } else if (responseFuture.isTimeout()) {
+                        future.completeExceptionally(new 
RemotingTimeoutException(addr, timeoutMillis, responseFuture.getCause()));
+                    } else {
+                        future.completeExceptionally(new 
RemotingException(request.toString(), responseFuture.getCause()));
+                    }
+                }
+            });
+        } catch (Throwable t) {
+            future.completeExceptionally(t);
+        }
+        return future;
+    }
+
     void registerProcessor(final int requestCode, final NettyRequestProcessor 
processor,
         final ExecutorService executor);
 
diff --git 
a/remoting/src/test/java/org/apache/rocketmq/remoting/netty/NettyRemotingClientTest.java
 
b/remoting/src/test/java/org/apache/rocketmq/remoting/netty/NettyRemotingClientTest.java
index 4b38ce952..efa3eb3d5 100644
--- 
a/remoting/src/test/java/org/apache/rocketmq/remoting/netty/NettyRemotingClientTest.java
+++ 
b/remoting/src/test/java/org/apache/rocketmq/remoting/netty/NettyRemotingClientTest.java
@@ -16,23 +16,111 @@
  */
 package org.apache.rocketmq.remoting.netty;
 
+import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
+import org.apache.rocketmq.remoting.InvokeCallback;
+import org.apache.rocketmq.remoting.exception.RemotingException;
+import org.apache.rocketmq.remoting.exception.RemotingSendRequestException;
+import org.apache.rocketmq.remoting.exception.RemotingTimeoutException;
+import org.apache.rocketmq.remoting.protocol.RemotingCommand;
+import org.apache.rocketmq.remoting.protocol.RequestCode;
+import org.apache.rocketmq.remoting.protocol.ResponseCode;
 import org.junit.Test;
 import org.junit.runner.RunWith;
+import org.mockito.Spy;
 import org.mockito.junit.MockitoJUnitRunner;
 
 import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.AssertionsForClassTypes.catchThrowable;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyLong;
+import static org.mockito.ArgumentMatchers.anyString;
+import static org.mockito.Mockito.doAnswer;
 
 @RunWith(MockitoJUnitRunner.class)
 public class NettyRemotingClientTest {
+    @Spy
     private NettyRemotingClient remotingClient = new NettyRemotingClient(new 
NettyClientConfig());
 
     @Test
     public void testSetCallbackExecutor() throws NoSuchFieldException, 
IllegalAccessException {        
         ExecutorService customized = Executors.newCachedThreadPool();
         remotingClient.setCallbackExecutor(customized);
-
         assertThat(remotingClient.getCallbackExecutor()).isEqualTo(customized);
     }
+
+    @Test
+    public void testInvokeResponse() throws Exception {
+        RemotingCommand request = 
RemotingCommand.createRequestCommand(RequestCode.PULL_MESSAGE, null);
+
+        RemotingCommand response = RemotingCommand.createResponseCommand(null);
+        response.setCode(ResponseCode.SUCCESS);
+        doAnswer(invocation -> {
+            InvokeCallback callback = invocation.getArgument(3);
+            ResponseFuture responseFuture = new ResponseFuture(null, 
request.getOpaque(), 3 * 1000, null, null);
+            responseFuture.setResponseCommand(response);
+            callback.operationComplete(responseFuture);
+            return null;
+        }).when(remotingClient).invokeAsync(anyString(), 
any(RemotingCommand.class), anyLong(), any(InvokeCallback.class));
+
+        CompletableFuture<RemotingCommand> future = 
remotingClient.invoke("0.0.0.0", request, 1000);
+        RemotingCommand actual = future.get();
+        assertThat(actual).isEqualTo(response);
+    }
+
+    @Test
+    public void testRemotingSendRequestException() throws Exception {
+        RemotingCommand request = 
RemotingCommand.createRequestCommand(RequestCode.PULL_MESSAGE, null);
+
+        RemotingCommand response = RemotingCommand.createResponseCommand(null);
+        response.setCode(ResponseCode.SUCCESS);
+        doAnswer(invocation -> {
+            InvokeCallback callback = invocation.getArgument(3);
+            ResponseFuture responseFuture = new ResponseFuture(null, 
request.getOpaque(), 3 * 1000, null, null);
+            responseFuture.setSendRequestOK(false);
+            callback.operationComplete(responseFuture);
+            return null;
+        }).when(remotingClient).invokeAsync(anyString(), 
any(RemotingCommand.class), anyLong(), any(InvokeCallback.class));
+
+        CompletableFuture<RemotingCommand> future = 
remotingClient.invoke("0.0.0.0", request, 1000);
+        Throwable thrown = catchThrowable(future::get);
+        
assertThat(thrown.getCause()).isInstanceOf(RemotingSendRequestException.class);
+    }
+
+    @Test
+    public void testRemotingTimeoutException() throws Exception {
+        RemotingCommand request = 
RemotingCommand.createRequestCommand(RequestCode.PULL_MESSAGE, null);
+
+        RemotingCommand response = RemotingCommand.createResponseCommand(null);
+        response.setCode(ResponseCode.SUCCESS);
+        doAnswer(invocation -> {
+            InvokeCallback callback = invocation.getArgument(3);
+            ResponseFuture responseFuture = new ResponseFuture(null, 
request.getOpaque(), -1L, null, null);
+            callback.operationComplete(responseFuture);
+            return null;
+        }).when(remotingClient).invokeAsync(anyString(), 
any(RemotingCommand.class), anyLong(), any(InvokeCallback.class));
+
+        CompletableFuture<RemotingCommand> future = 
remotingClient.invoke("0.0.0.0", request, 1000);
+        Throwable thrown = catchThrowable(future::get);
+        
assertThat(thrown.getCause()).isInstanceOf(RemotingTimeoutException.class);
+    }
+
+    @Test
+    public void testRemotingException() throws Exception {
+        RemotingCommand request = 
RemotingCommand.createRequestCommand(RequestCode.PULL_MESSAGE, null);
+
+        RemotingCommand response = RemotingCommand.createResponseCommand(null);
+        response.setCode(ResponseCode.SUCCESS);
+        doAnswer(invocation -> {
+            InvokeCallback callback = invocation.getArgument(3);
+            ResponseFuture responseFuture = new ResponseFuture(null, 
request.getOpaque(), 3 * 1000, null, null);
+            callback.operationComplete(responseFuture);
+            return null;
+        }).when(remotingClient).invokeAsync(anyString(), 
any(RemotingCommand.class), anyLong(), any(InvokeCallback.class));
+
+        CompletableFuture<RemotingCommand> future = 
remotingClient.invoke("0.0.0.0", request, 1000);
+        Throwable thrown = catchThrowable(future::get);
+        assertThat(thrown.getCause()).isInstanceOf(RemotingException.class);
+    }
 }


Reply via email to