This is an automated email from the ASF dual-hosted git repository.

dongeforever pushed a commit to branch 5.0.0-alpha-static-topic
in repository https://gitbox.apache.org/repos/asf/rocketmq.git

commit 86b3ff7b102eeba4f12c837f64c90bb033d90717
Author: dongeforever <[email protected]>
AuthorDate: Tue Nov 16 15:40:28 2021 +0800

    Finish the processor
---
 .../broker/processor/AdminBrokerProcessor.java     | 22 ++++++++++++++++------
 1 file changed, 16 insertions(+), 6 deletions(-)

diff --git 
a/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java
 
b/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java
index 9f525f6..0a8f58c 100644
--- 
a/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java
+++ 
b/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java
@@ -646,8 +646,10 @@ public class AdminBrokerProcessor extends 
AsyncNettyRequestProcessor implements
                     requestHeader.setPhysical(true);
                     requestHeader.setTimestamp(timestamp);
                     requestHeader.setQueueId(item.getQueueId());
-                    RpcRequest rpcRequest = new 
RpcRequest(RequestCode.SEARCH_OFFSET_BY_TIMESTAMP, requestHeader, null);
-                    RpcResponse rpcResponse = 
this.brokerController.getBrokerOuterAPI().searchOffset(item.getBname(), 
rpcRequest, this.brokerController.getBrokerConfig().getForwardTimeout());
+                    
requestHeader.setCode(RequestCode.SEARCH_OFFSET_BY_TIMESTAMP);
+                    requestHeader.setBname(item.getBname());
+                    RpcRequest rpcRequest = new RpcRequest(requestHeader, 
null);
+                    RpcResponse rpcResponse = 
this.brokerController.getBrokerOuterAPI().getRpcClient().invoke(rpcRequest, 
this.brokerController.getBrokerConfig().getForwardTimeout()).get();
                     if (rpcResponse.getException() != null) {
                         throw rpcResponse.getException();
                     }
@@ -751,8 +753,12 @@ public class AdminBrokerProcessor extends 
AsyncNettyRequestProcessor implements
             return buildErrorResponse(ResponseCode.NOT_LEADER_FOR_QUEUE, 
String.format("%s-%d does not exit in request process of current broker %s", 
mappingContext.getTopic(), mappingContext.getGlobalId(), 
mappingDetail.getBname()));
         };
         try {
-            RpcRequest rpcRequest = new RpcRequest(RequestCode.GET_MIN_OFFSET, 
requestHeader, null);
-            RpcResponse rpcResponse = 
this.brokerController.getBrokerOuterAPI().getMinOffset(mappingItem.getBname(), 
rpcRequest, this.brokerController.getBrokerConfig().getForwardTimeout());
+            requestHeader.setCode(RequestCode.GET_MIN_OFFSET);
+            requestHeader.setBname(mappingItem.getBname());
+            requestHeader.setPhysical(true);
+            //TODO check if it is leader
+            RpcRequest rpcRequest = new RpcRequest(requestHeader, null);
+            RpcResponse rpcResponse = 
this.brokerController.getBrokerOuterAPI().getRpcClient().invoke(rpcRequest, 
this.brokerController.getBrokerConfig().getForwardTimeout()).get();
             if (rpcResponse.getException() != null) {
                 throw rpcResponse.getException();
             }
@@ -802,8 +808,12 @@ public class AdminBrokerProcessor extends 
AsyncNettyRequestProcessor implements
             return buildErrorResponse(ResponseCode.NOT_LEADER_FOR_QUEUE, 
String.format("%s-%d does not exit in request process of current broker %s", 
mappingContext.getTopic(), mappingContext.getGlobalId(), 
mappingDetail.getBname()));
         };
         try {
-            RpcRequest rpcRequest = new RpcRequest(RequestCode.GET_MIN_OFFSET, 
requestHeader, null);
-            RpcResponse rpcResponse = 
this.brokerController.getBrokerOuterAPI().getEarliestMsgStoretime(mappingItem.getBname(),
 rpcRequest, this.brokerController.getBrokerConfig().getForwardTimeout());
+            requestHeader.setCode(RequestCode.GET_MIN_OFFSET);
+            requestHeader.setBname(mappingItem.getBname());
+            requestHeader.setPhysical(true);
+            RpcRequest rpcRequest = new RpcRequest(requestHeader, null);
+            //TODO check if it is leader
+            RpcResponse rpcResponse = 
this.brokerController.getBrokerOuterAPI().getRpcClient().invoke(rpcRequest, 
this.brokerController.getBrokerConfig().getForwardTimeout()).get();
             if (rpcResponse.getException() != null) {
                 throw rpcResponse.getException();
             }

Reply via email to