This is an automated email from the ASF dual-hosted git repository.

dongeforever pushed a commit to branch 5.0.0-alpha-static-topic
in repository https://gitbox.apache.org/repos/asf/rocketmq.git

commit 311d5f4a4b453a5a96143ac9cc56294cbbde028f
Author: dongeforever <[email protected]>
AuthorDate: Thu Nov 11 15:39:38 2021 +0800

    Try to abstract the rpc layer for BrokerOuterAPI
---
 .../apache/rocketmq/broker/out/BrokerOuterAPI.java | 126 ++++++++++++++++++++-
 .../broker/processor/AdminBrokerProcessor.java     |  33 ++++++
 .../protocol/header/SearchOffsetRequestHeader.java |   8 +-
 .../org/apache/rocketmq/remoting/RpcRequest.java   |  41 +++++++
 .../org/apache/rocketmq/remoting/RpcResponse.java  |  50 ++++++++
 .../remoting/protocol/RemotingCommand.java         |  19 ++++
 6 files changed, 269 insertions(+), 8 deletions(-)

diff --git 
a/broker/src/main/java/org/apache/rocketmq/broker/out/BrokerOuterAPI.java 
b/broker/src/main/java/org/apache/rocketmq/broker/out/BrokerOuterAPI.java
index 7f3ce81..c5f8b90 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/out/BrokerOuterAPI.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/out/BrokerOuterAPI.java
@@ -45,8 +45,15 @@ import 
org.apache.rocketmq.common.protocol.body.RegisterBrokerBody;
 import org.apache.rocketmq.common.protocol.body.SubscriptionGroupWrapper;
 import 
org.apache.rocketmq.common.protocol.body.TopicConfigAndMappingSerializeWrapper;
 import org.apache.rocketmq.common.protocol.body.TopicConfigSerializeWrapper;
+import 
org.apache.rocketmq.common.protocol.header.GetEarliestMsgStoretimeRequestHeader;
+import 
org.apache.rocketmq.common.protocol.header.GetEarliestMsgStoretimeResponseHeader;
+import org.apache.rocketmq.common.protocol.header.GetMinOffsetRequestHeader;
+import org.apache.rocketmq.common.protocol.header.GetMinOffsetResponseHeader;
 import org.apache.rocketmq.common.protocol.header.PullMessageRequestHeader;
 import org.apache.rocketmq.common.protocol.header.PullMessageResponseHeader;
+import org.apache.rocketmq.common.protocol.header.SearchOffsetRequestHeader;
+import org.apache.rocketmq.common.protocol.header.SearchOffsetResponseHeader;
+import org.apache.rocketmq.common.protocol.header.SendMessageResponseHeader;
 import 
org.apache.rocketmq.common.protocol.header.namesrv.GetRouteInfoRequestHeader;
 import 
org.apache.rocketmq.common.protocol.header.namesrv.QueryDataVersionRequestHeader;
 import 
org.apache.rocketmq.common.protocol.header.namesrv.QueryDataVersionResponseHeader;
@@ -59,6 +66,8 @@ import org.apache.rocketmq.logging.InternalLoggerFactory;
 import org.apache.rocketmq.remoting.InvokeCallback;
 import org.apache.rocketmq.remoting.RPCHook;
 import org.apache.rocketmq.remoting.RemotingClient;
+import org.apache.rocketmq.remoting.RpcRequest;
+import org.apache.rocketmq.remoting.RpcResponse;
 import org.apache.rocketmq.remoting.exception.RemotingCommandException;
 import org.apache.rocketmq.remoting.exception.RemotingConnectException;
 import org.apache.rocketmq.remoting.exception.RemotingException;
@@ -75,6 +84,7 @@ public class BrokerOuterAPI {
     private final BrokerController brokerController;
     private final TopAddressing topAddressing = new 
TopAddressing(MixAll.getWSAddr());
     private String nameSrvAddr = null;
+    private final String currBrokerName;
     private BrokerFixedThreadPoolExecutor brokerOuterExecutor = new 
BrokerFixedThreadPoolExecutor(4, 10, 1, TimeUnit.MINUTES,
         new ArrayBlockingQueue<Runnable>(32), new 
ThreadFactoryImpl("brokerOutApi_thread_", true));
 
@@ -86,6 +96,7 @@ public class BrokerOuterAPI {
         this.remotingClient = new NettyRemotingClient(nettyClientConfig);
         this.remotingClient.registerRPCHook(rpcHook);
         this.brokerController = brokerController;
+        this.currBrokerName =  
brokerController.getBrokerConfig().getBrokerName();
     }
 
     public void start() {
@@ -463,14 +474,13 @@ public class BrokerOuterAPI {
         this.remotingClient.invokeAsync(brokerAddr, request, timeoutMillis, 
invokeCallback);
     }
 
-    public RemotingCommand pullMessage(String brokerName, 
PullMessageRequestHeader requestHeader, long timeoutMillis) throws Exception {
+    public RemotingCommand pullMessage(String bname, PullMessageRequestHeader 
requestHeader, long timeoutMillis) throws Exception {
 
-        String addr = this.brokerController.getBrokerAddrByName(brokerName);
+        String addr = this.brokerController.getBrokerAddrByName(bname);
         if (addr == null) {
-            final RemotingCommand response = 
RemotingCommand.createResponseCommand(PullMessageResponseHeader.class);
-            response.setCode(ResponseCode.SYSTEM_ERROR);
-            response.setRemark(String.format("%s-%d cannot find addr when 
forward to broker %s in broker %s", requestHeader.getTopic(), 
requestHeader.getQueueId(), brokerName, 
this.brokerController.getBrokerConfig().getBrokerName()));
-            return response;
+            return 
RemotingCommand.buildErrorResponse(ResponseCode.SYSTEM_ERROR,
+                    String.format("%s-%d cannot find addr when forward to 
broker %s in broker %s", requestHeader.getTopic(), requestHeader.getQueueId(), 
bname, currBrokerName),
+                    PullMessageResponseHeader.class);
         }
         RemotingCommand request = 
RemotingCommand.createRequestCommand(RequestCode.PULL_MESSAGE, requestHeader);
         RemotingCommand response = this.remotingClient.invokeSync(addr, 
request, timeoutMillis);
@@ -478,4 +488,108 @@ public class BrokerOuterAPI {
         return response;
 
     }
+
+    public RpcResponse pullMessage(String bname, RpcRequest rpcRequest, long 
timeoutMillis) throws Exception {
+        String addr = this.brokerController.getBrokerAddrByName(bname);
+        if (addr == null) {
+            RpcResponse rpcResponse = new 
RpcResponse(ResponseCode.SYSTEM_ERROR, null, null);
+            rpcResponse.setException(new 
MQBrokerException(ResponseCode.SYSTEM_ERROR, "cannot find addr for broker " + 
bname, addr));
+            return rpcResponse;
+        }
+        RemotingCommand requestCommand = 
RemotingCommand.createRequestCommand(rpcRequest );
+        RemotingCommand responseCommand = this.remotingClient.invokeSync(addr, 
requestCommand, timeoutMillis);
+        assert responseCommand != null;
+
+        switch (responseCommand.getCode()) {
+            case ResponseCode.SUCCESS:
+            case ResponseCode.PULL_NOT_FOUND:
+            case ResponseCode.PULL_RETRY_IMMEDIATELY:
+            case ResponseCode.PULL_OFFSET_MOVED:
+                PullMessageResponseHeader responseHeader =
+                        (PullMessageResponseHeader) 
responseCommand.decodeCommandCustomHeader(PullMessageResponseHeader.class);
+                return new RpcResponse(responseCommand.getCode(), 
responseHeader, responseCommand.getBody());
+            default:
+                RpcResponse rpcResponse = new 
RpcResponse(responseCommand.getCode(), null, null);
+                rpcResponse.setException(new 
MQBrokerException(responseCommand.getCode(), "unknown remote error", addr));
+                return rpcResponse;
+        }
+    }
+
+    public RpcResponse searchOffset(String bname, RpcRequest rpcRequest, long 
timeoutMillis) throws Exception {
+        String addr = this.brokerController.getBrokerAddrByName(bname);
+        if (addr == null) {
+            RpcResponse rpcResponse = new 
RpcResponse(ResponseCode.SYSTEM_ERROR, null, null);
+            rpcResponse.setException(new 
MQBrokerException(ResponseCode.SYSTEM_ERROR, "cannot find addr for broker " + 
bname, addr));
+            return rpcResponse;
+        }
+        RemotingCommand requestCommand = 
RemotingCommand.createRequestCommand(rpcRequest);
+        RemotingCommand responseCommand = this.remotingClient.invokeSync(addr, 
requestCommand, timeoutMillis);
+        assert responseCommand != null;
+        switch (responseCommand.getCode()) {
+            case ResponseCode.SUCCESS: {
+                SearchOffsetResponseHeader responseHeader =
+                        (SearchOffsetResponseHeader) 
responseCommand.decodeCommandCustomHeader(SearchOffsetResponseHeader.class);
+                return new RpcResponse(responseCommand.getCode(), 
responseHeader, responseCommand.getBody());
+            }
+            default:{
+                RpcResponse rpcResponse = new 
RpcResponse(responseCommand.getCode(), null, null);
+                rpcResponse.setException(new 
MQBrokerException(responseCommand.getCode(), "unknown remote error", addr));
+                return rpcResponse;
+            }
+        }
+    }
+
+    public RpcResponse getMinOffset(String bname, RpcRequest rpcRequest, long 
timeoutMillis) throws Exception {
+        String addr = this.brokerController.getBrokerAddrByName(bname);
+        if (addr == null) {
+            RpcResponse rpcResponse = new 
RpcResponse(ResponseCode.SYSTEM_ERROR, null, null);
+            rpcResponse.setException(new 
MQBrokerException(ResponseCode.SYSTEM_ERROR, "cannot find addr for broker " + 
bname, addr));
+            return rpcResponse;
+        }
+
+        RemotingCommand requestCommand = 
RemotingCommand.createRequestCommand(rpcRequest);
+
+        RemotingCommand responseCommand = this.remotingClient.invokeSync(addr, 
requestCommand, timeoutMillis);
+        assert responseCommand != null;
+        switch (responseCommand.getCode()) {
+            case ResponseCode.SUCCESS: {
+                GetMinOffsetResponseHeader responseHeader =
+                        (GetMinOffsetResponseHeader) 
responseCommand.decodeCommandCustomHeader(GetMinOffsetResponseHeader.class);
+                return new RpcResponse(responseCommand.getCode(), 
responseHeader, responseCommand.getBody());
+            }
+            default:{
+                RpcResponse rpcResponse = new 
RpcResponse(responseCommand.getCode(), null, null);
+                rpcResponse.setException(new 
MQBrokerException(responseCommand.getCode(), "unknown remote error", addr));
+                return rpcResponse;
+            }
+        }
+    }
+
+    public RpcResponse getEarliestMsgStoretime(String bname, RpcRequest 
rpcRequest, long timeoutMillis) throws Exception {
+        String addr = this.brokerController.getBrokerAddrByName(bname);
+        if (addr == null) {
+            RpcResponse rpcResponse = new 
RpcResponse(ResponseCode.SYSTEM_ERROR, null, null);
+            rpcResponse.setException(new 
MQBrokerException(ResponseCode.SYSTEM_ERROR, "cannot find addr for broker " + 
bname, addr));
+            return rpcResponse;
+        }
+
+        RemotingCommand requestCommand = 
RemotingCommand.createRequestCommand(rpcRequest);
+
+        RemotingCommand responseCommand = this.remotingClient.invokeSync(addr, 
requestCommand, timeoutMillis);
+        assert responseCommand != null;
+        switch (responseCommand.getCode()) {
+            case ResponseCode.SUCCESS: {
+                GetEarliestMsgStoretimeResponseHeader responseHeader =
+                        (GetEarliestMsgStoretimeResponseHeader) 
responseCommand.decodeCommandCustomHeader(GetEarliestMsgStoretimeResponseHeader.class);
+                return new RpcResponse(responseCommand.getCode(), 
responseHeader, responseCommand.getBody());
+
+            }
+            default:{
+                RpcResponse rpcResponse = new 
RpcResponse(responseCommand.getCode(), null, null);
+                rpcResponse.setException(new 
MQBrokerException(responseCommand.getCode(), "unknown remote error", addr));
+                return rpcResponse;
+            }
+        }
+    }
+
 }
diff --git 
a/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java
 
b/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java
index 6bd742c..0b6fa48 100644
--- 
a/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java
+++ 
b/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java
@@ -139,6 +139,8 @@ import org.apache.rocketmq.store.PutMessageResult;
 import org.apache.rocketmq.store.PutMessageStatus;
 import org.apache.rocketmq.store.SelectMappedBufferResult;
 
+import static 
org.apache.rocketmq.remoting.protocol.RemotingCommand.buildErrorResponse;
+
 public class AdminBrokerProcessor extends AsyncNettyRequestProcessor 
implements NettyRequestProcessor {
     private static final InternalLogger log = 
InternalLoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME);
     private final BrokerController brokerController;
@@ -614,6 +616,29 @@ public class AdminBrokerProcessor extends 
AsyncNettyRequestProcessor implements
         return response;
     }
 
+    private RemotingCommand 
rewriteRequestForStaticTopic(SearchOffsetRequestHeader requestHeader, 
TopicQueueMappingContext mappingContext) {
+        try {
+            if (mappingContext.getMappingDetail() == null) {
+                return null;
+            }
+            TopicQueueMappingDetail mappingDetail = 
mappingContext.getMappingDetail();
+            LogicQueueMappingItem mappingItem = 
mappingContext.getMappingItem();
+            if (mappingItem == null
+                    || 
!mappingDetail.getBname().equals(mappingItem.getBname())) {
+                return buildErrorResponse(ResponseCode.NOT_LEADER_FOR_QUEUE, 
String.format("%s-%d does not exit in request process of current broker %s", 
requestHeader.getTopic(), requestHeader.getQueueId(), 
mappingDetail.getBname()));
+            }
+            ImmutableList<LogicQueueMappingItem> mappingItems = 
mappingContext.getMappingItemList();
+            //TODO should make sure the offset timestamp is equal or bigger 
than the searched timestamp
+            for (int i = mappingItems.size() - 1; i >=0; i--) {
+
+            }
+            requestHeader.setQueueId(mappingItem.getQueueId());
+            return null;
+        } catch (Throwable t) {
+            return buildErrorResponse(ResponseCode.SYSTEM_ERROR, 
t.getMessage());
+        }
+    }
+
     private RemotingCommand searchOffsetByTimestamp(ChannelHandlerContext ctx,
         RemotingCommand request) throws RemotingCommandException {
         final RemotingCommand response = 
RemotingCommand.createResponseCommand(SearchOffsetResponseHeader.class);
@@ -621,6 +646,14 @@ public class AdminBrokerProcessor extends 
AsyncNettyRequestProcessor implements
         final SearchOffsetRequestHeader requestHeader =
             (SearchOffsetRequestHeader) 
request.decodeCommandCustomHeader(SearchOffsetRequestHeader.class);
 
+        {
+            TopicQueueMappingContext mappingContext = 
this.brokerController.getTopicQueueMappingManager().buildTopicQueueMappingContext(requestHeader);
+            TopicQueueMappingDetail mappingDetail = 
mappingContext.getMappingDetail();
+            if (mappingDetail != null) {
+
+            }
+        }
+
         long offset = 
this.brokerController.getMessageStore().getOffsetInQueueByTime(requestHeader.getTopic(),
 requestHeader.getQueueId(),
             requestHeader.getTimestamp());
 
diff --git 
a/common/src/main/java/org/apache/rocketmq/common/protocol/header/SearchOffsetRequestHeader.java
 
b/common/src/main/java/org/apache/rocketmq/common/protocol/header/SearchOffsetRequestHeader.java
index 5ea2e24..e3a4b1d 100644
--- 
a/common/src/main/java/org/apache/rocketmq/common/protocol/header/SearchOffsetRequestHeader.java
+++ 
b/common/src/main/java/org/apache/rocketmq/common/protocol/header/SearchOffsetRequestHeader.java
@@ -20,11 +20,11 @@
  */
 package org.apache.rocketmq.common.protocol.header;
 
-import org.apache.rocketmq.remoting.CommandCustomHeader;
+import org.apache.rocketmq.remoting.TopicQueueRequestHeader;
 import org.apache.rocketmq.remoting.annotation.CFNotNull;
 import org.apache.rocketmq.remoting.exception.RemotingCommandException;
 
-public class SearchOffsetRequestHeader implements CommandCustomHeader {
+public class SearchOffsetRequestHeader extends TopicQueueRequestHeader {
     @CFNotNull
     private String topic;
     @CFNotNull
@@ -37,18 +37,22 @@ public class SearchOffsetRequestHeader implements 
CommandCustomHeader {
 
     }
 
+    @Override
     public String getTopic() {
         return topic;
     }
 
+    @Override
     public void setTopic(String topic) {
         this.topic = topic;
     }
 
+    @Override
     public Integer getQueueId() {
         return queueId;
     }
 
+    @Override
     public void setQueueId(Integer queueId) {
         this.queueId = queueId;
     }
diff --git 
a/remoting/src/main/java/org/apache/rocketmq/remoting/RpcRequest.java 
b/remoting/src/main/java/org/apache/rocketmq/remoting/RpcRequest.java
new file mode 100644
index 0000000..e8e03d5
--- /dev/null
+++ b/remoting/src/main/java/org/apache/rocketmq/remoting/RpcRequest.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.remoting;
+
+public class RpcRequest {
+    private int code;
+    private CommandCustomHeader header;
+    private byte[] body;
+
+    public RpcRequest(int code, CommandCustomHeader header, byte[] body) {
+        this.code = code;
+        this.header = header;
+        this.body = body;
+    }
+
+    public int getCode() {
+        return code;
+    }
+
+    public CommandCustomHeader getHeader() {
+        return header;
+    }
+
+    public byte[] getBody() {
+        return body;
+    }
+}
diff --git 
a/remoting/src/main/java/org/apache/rocketmq/remoting/RpcResponse.java 
b/remoting/src/main/java/org/apache/rocketmq/remoting/RpcResponse.java
new file mode 100644
index 0000000..9ae9950
--- /dev/null
+++ b/remoting/src/main/java/org/apache/rocketmq/remoting/RpcResponse.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.remoting;
+
+public class RpcResponse   {
+    private int code;
+    private CommandCustomHeader header;
+    private byte[] body;
+    public Exception exception;
+
+    public RpcResponse(int code, CommandCustomHeader header, byte[] body) {
+        this.code = code;
+        this.header = header;
+        this.body = body;
+    }
+
+    public int getCode() {
+        return code;
+    }
+
+    public CommandCustomHeader getHeader() {
+        return header;
+    }
+
+    public byte[] getBody() {
+        return body;
+    }
+
+    public Exception getException() {
+        return exception;
+    }
+
+    public void setException(Exception exception) {
+        this.exception = exception;
+    }
+}
diff --git 
a/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/RemotingCommand.java
 
b/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/RemotingCommand.java
index 34a1790..8fc3f9e 100644
--- 
a/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/RemotingCommand.java
+++ 
b/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/RemotingCommand.java
@@ -20,11 +20,13 @@ import com.alibaba.fastjson.annotation.JSONField;
 import java.lang.annotation.Annotation;
 import java.lang.reflect.Field;
 import java.lang.reflect.Modifier;
+import java.lang.reflect.Type;
 import java.nio.ByteBuffer;
 import java.util.HashMap;
 import java.util.Map;
 import java.util.concurrent.atomic.AtomicInteger;
 import org.apache.rocketmq.remoting.CommandCustomHeader;
+import org.apache.rocketmq.remoting.RpcRequest;
 import org.apache.rocketmq.remoting.annotation.CFNotNull;
 import org.apache.rocketmq.remoting.common.RemotingHelper;
 import org.apache.rocketmq.remoting.exception.RemotingCommandException;
@@ -85,6 +87,15 @@ public class RemotingCommand {
     protected RemotingCommand() {
     }
 
+    public static RemotingCommand createRequestCommand(RpcRequest rpcRequest) {
+        RemotingCommand cmd = new RemotingCommand();
+        cmd.setCode(rpcRequest.getCode());
+        cmd.customHeader = rpcRequest.getHeader();
+        setCmdVersion(cmd);
+        cmd.setBody(rpcRequest.getBody());
+        return cmd;
+    }
+
     public static RemotingCommand createRequestCommand(int code, 
CommandCustomHeader customHeader) {
         RemotingCommand cmd = new RemotingCommand();
         cmd.setCode(code);
@@ -110,6 +121,13 @@ public class RemotingCommand {
         return createResponseCommand(RemotingSysResponseCode.SYSTEM_ERROR, 
"not set any response code", classHeader);
     }
 
+    public static RemotingCommand buildErrorResponse(int code, String remark, 
Class<? extends CommandCustomHeader> classHeader) {
+        final RemotingCommand response = 
RemotingCommand.createResponseCommand(classHeader);
+        response.setCode(code);
+        response.setRemark(remark);
+        return response;
+    }
+
     public static RemotingCommand buildErrorResponse(int code, String remark) {
         final RemotingCommand response = 
RemotingCommand.createResponseCommand(null);
         response.setCode(code);
@@ -117,6 +135,7 @@ public class RemotingCommand {
         return response;
     }
 
+
     public static RemotingCommand createResponseCommand(int code, String 
remark,
         Class<? extends CommandCustomHeader> classHeader) {
         RemotingCommand cmd = new RemotingCommand();

Reply via email to