This is an automated email from the ASF dual-hosted git repository.

dongeforever pushed a commit to branch 5.0.0-alpha-static-topic
in repository https://gitbox.apache.org/repos/asf/rocketmq.git


The following commit(s) were added to refs/heads/5.0.0-alpha-static-topic by 
this push:
     new 86c9f10  Fix code to RpcRequest
86c9f10 is described below

commit 86c9f100cfedb040470a6b0aa69e4635323be96c
Author: dongeforever <[email protected]>
AuthorDate: Tue Nov 16 21:56:07 2021 +0800

    Fix code to RpcRequest
---
 .../broker/processor/AdminBrokerProcessor.java     |  9 ++---
 .../broker/processor/PullMessageProcessor.java     |  3 +-
 .../GetEarliestMsgStoretimeResponseHeader.java     |  3 +-
 .../header/GetMinOffsetResponseHeader.java         |  3 +-
 .../protocol/header/PullMessageResponseHeader.java |  3 +-
 .../header/SearchOffsetResponseHeader.java         |  3 +-
 .../apache/rocketmq/common/rpc/RequestBuilder.java |  2 -
 .../apache/rocketmq/common/rpc/RpcClientImpl.java  | 16 +++-----
 .../apache/rocketmq/common/rpc/RpcClientUtils.java |  4 +-
 .../org/apache/rocketmq/common/rpc/RpcHeader.java  | 46 ----------------------
 .../org/apache/rocketmq/common/rpc/RpcRequest.java |  8 +++-
 .../rocketmq/common/rpc/RpcRequestHeader.java      |  4 +-
 .../apache/rocketmq/common/rpc/RpcResponse.java    | 20 +++++++---
 13 files changed, 40 insertions(+), 84 deletions(-)

diff --git 
a/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java
 
b/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java
index 0a8f58c..3584316 100644
--- 
a/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java
+++ 
b/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java
@@ -646,9 +646,8 @@ public class AdminBrokerProcessor extends 
AsyncNettyRequestProcessor implements
                     requestHeader.setPhysical(true);
                     requestHeader.setTimestamp(timestamp);
                     requestHeader.setQueueId(item.getQueueId());
-                    
requestHeader.setCode(RequestCode.SEARCH_OFFSET_BY_TIMESTAMP);
                     requestHeader.setBname(item.getBname());
-                    RpcRequest rpcRequest = new RpcRequest(requestHeader, 
null);
+                    RpcRequest rpcRequest = new 
RpcRequest(RequestCode.SEARCH_OFFSET_BY_TIMESTAMP, requestHeader, null);
                     RpcResponse rpcResponse = 
this.brokerController.getBrokerOuterAPI().getRpcClient().invoke(rpcRequest, 
this.brokerController.getBrokerConfig().getForwardTimeout()).get();
                     if (rpcResponse.getException() != null) {
                         throw rpcResponse.getException();
@@ -753,11 +752,10 @@ public class AdminBrokerProcessor extends 
AsyncNettyRequestProcessor implements
             return buildErrorResponse(ResponseCode.NOT_LEADER_FOR_QUEUE, 
String.format("%s-%d does not exit in request process of current broker %s", 
mappingContext.getTopic(), mappingContext.getGlobalId(), 
mappingDetail.getBname()));
         };
         try {
-            requestHeader.setCode(RequestCode.GET_MIN_OFFSET);
             requestHeader.setBname(mappingItem.getBname());
             requestHeader.setPhysical(true);
             //TODO check if it is leader
-            RpcRequest rpcRequest = new RpcRequest(requestHeader, null);
+            RpcRequest rpcRequest = new RpcRequest(RequestCode.GET_MIN_OFFSET, 
requestHeader, null);
             RpcResponse rpcResponse = 
this.brokerController.getBrokerOuterAPI().getRpcClient().invoke(rpcRequest, 
this.brokerController.getBrokerConfig().getForwardTimeout()).get();
             if (rpcResponse.getException() != null) {
                 throw rpcResponse.getException();
@@ -808,10 +806,9 @@ public class AdminBrokerProcessor extends 
AsyncNettyRequestProcessor implements
             return buildErrorResponse(ResponseCode.NOT_LEADER_FOR_QUEUE, 
String.format("%s-%d does not exit in request process of current broker %s", 
mappingContext.getTopic(), mappingContext.getGlobalId(), 
mappingDetail.getBname()));
         };
         try {
-            requestHeader.setCode(RequestCode.GET_MIN_OFFSET);
             requestHeader.setBname(mappingItem.getBname());
             requestHeader.setPhysical(true);
-            RpcRequest rpcRequest = new RpcRequest(requestHeader, null);
+            RpcRequest rpcRequest = new RpcRequest(RequestCode.GET_MIN_OFFSET, 
requestHeader, null);
             //TODO check if it is leader
             RpcResponse rpcResponse = 
this.brokerController.getBrokerOuterAPI().getRpcClient().invoke(rpcRequest, 
this.brokerController.getBrokerConfig().getForwardTimeout()).get();
             if (rpcResponse.getException() != null) {
diff --git 
a/broker/src/main/java/org/apache/rocketmq/broker/processor/PullMessageProcessor.java
 
b/broker/src/main/java/org/apache/rocketmq/broker/processor/PullMessageProcessor.java
index 0cd4668..df89889 100644
--- 
a/broker/src/main/java/org/apache/rocketmq/broker/processor/PullMessageProcessor.java
+++ 
b/broker/src/main/java/org/apache/rocketmq/broker/processor/PullMessageProcessor.java
@@ -145,8 +145,7 @@ public class PullMessageProcessor extends 
AsyncNettyRequestProcessor implements
 
             requestHeader.setPhysical(true);
             requestHeader.setBname(bname);
-            requestHeader.setCode(RequestCode.PULL_MESSAGE);
-            RpcRequest rpcRequest = new RpcRequest(requestHeader, null);
+            RpcRequest rpcRequest = new RpcRequest(RequestCode.PULL_MESSAGE, 
requestHeader, null);
             RpcResponse rpcResponse = 
this.brokerController.getBrokerOuterAPI().getRpcClient().invoke(rpcRequest, 
this.brokerController.getBrokerConfig().getForwardTimeout()).get();
             if (rpcResponse.getException() != null) {
                 throw rpcResponse.getException();
diff --git 
a/common/src/main/java/org/apache/rocketmq/common/protocol/header/GetEarliestMsgStoretimeResponseHeader.java
 
b/common/src/main/java/org/apache/rocketmq/common/protocol/header/GetEarliestMsgStoretimeResponseHeader.java
index 10e7d82..6b9b3b2 100644
--- 
a/common/src/main/java/org/apache/rocketmq/common/protocol/header/GetEarliestMsgStoretimeResponseHeader.java
+++ 
b/common/src/main/java/org/apache/rocketmq/common/protocol/header/GetEarliestMsgStoretimeResponseHeader.java
@@ -20,12 +20,11 @@
  */
 package org.apache.rocketmq.common.protocol.header;
 
-import org.apache.rocketmq.common.rpc.RpcHeader;
 import org.apache.rocketmq.remoting.CommandCustomHeader;
 import org.apache.rocketmq.remoting.annotation.CFNotNull;
 import org.apache.rocketmq.remoting.exception.RemotingCommandException;
 
-public class GetEarliestMsgStoretimeResponseHeader extends RpcHeader {
+public class GetEarliestMsgStoretimeResponseHeader implements 
CommandCustomHeader {
     @CFNotNull
     private Long timestamp;
 
diff --git 
a/common/src/main/java/org/apache/rocketmq/common/protocol/header/GetMinOffsetResponseHeader.java
 
b/common/src/main/java/org/apache/rocketmq/common/protocol/header/GetMinOffsetResponseHeader.java
index 57cb050..6fc0fac 100644
--- 
a/common/src/main/java/org/apache/rocketmq/common/protocol/header/GetMinOffsetResponseHeader.java
+++ 
b/common/src/main/java/org/apache/rocketmq/common/protocol/header/GetMinOffsetResponseHeader.java
@@ -20,12 +20,11 @@
  */
 package org.apache.rocketmq.common.protocol.header;
 
-import org.apache.rocketmq.common.rpc.RpcHeader;
 import org.apache.rocketmq.remoting.CommandCustomHeader;
 import org.apache.rocketmq.remoting.annotation.CFNotNull;
 import org.apache.rocketmq.remoting.exception.RemotingCommandException;
 
-public class GetMinOffsetResponseHeader extends RpcHeader {
+public class GetMinOffsetResponseHeader implements CommandCustomHeader {
     @CFNotNull
     private Long offset;
 
diff --git 
a/common/src/main/java/org/apache/rocketmq/common/protocol/header/PullMessageResponseHeader.java
 
b/common/src/main/java/org/apache/rocketmq/common/protocol/header/PullMessageResponseHeader.java
index e2f896b..88af984 100644
--- 
a/common/src/main/java/org/apache/rocketmq/common/protocol/header/PullMessageResponseHeader.java
+++ 
b/common/src/main/java/org/apache/rocketmq/common/protocol/header/PullMessageResponseHeader.java
@@ -20,13 +20,12 @@
  */
 package org.apache.rocketmq.common.protocol.header;
 
-import org.apache.rocketmq.common.rpc.RpcHeader;
 import org.apache.rocketmq.remoting.CommandCustomHeader;
 import org.apache.rocketmq.remoting.annotation.CFNotNull;
 import org.apache.rocketmq.remoting.annotation.CFNullable;
 import org.apache.rocketmq.remoting.exception.RemotingCommandException;
 
-public class PullMessageResponseHeader extends RpcHeader {
+public class PullMessageResponseHeader implements CommandCustomHeader {
     @CFNotNull
     private Long suggestWhichBrokerId;
     @CFNotNull
diff --git 
a/common/src/main/java/org/apache/rocketmq/common/protocol/header/SearchOffsetResponseHeader.java
 
b/common/src/main/java/org/apache/rocketmq/common/protocol/header/SearchOffsetResponseHeader.java
index 416f7f7..f88ac68 100644
--- 
a/common/src/main/java/org/apache/rocketmq/common/protocol/header/SearchOffsetResponseHeader.java
+++ 
b/common/src/main/java/org/apache/rocketmq/common/protocol/header/SearchOffsetResponseHeader.java
@@ -20,12 +20,11 @@
  */
 package org.apache.rocketmq.common.protocol.header;
 
-import org.apache.rocketmq.common.rpc.RpcHeader;
 import org.apache.rocketmq.remoting.CommandCustomHeader;
 import org.apache.rocketmq.remoting.annotation.CFNotNull;
 import org.apache.rocketmq.remoting.exception.RemotingCommandException;
 
-public class SearchOffsetResponseHeader extends RpcHeader {
+public class SearchOffsetResponseHeader implements CommandCustomHeader {
     @CFNotNull
     private Long offset;
 
diff --git 
a/common/src/main/java/org/apache/rocketmq/common/rpc/RequestBuilder.java 
b/common/src/main/java/org/apache/rocketmq/common/rpc/RequestBuilder.java
index a791543..4b5c62b 100644
--- a/common/src/main/java/org/apache/rocketmq/common/rpc/RequestBuilder.java
+++ b/common/src/main/java/org/apache/rocketmq/common/rpc/RequestBuilder.java
@@ -25,7 +25,6 @@ public class RequestBuilder {
         }
         try {
             RpcRequestHeader requestHeader = (RpcRequestHeader) 
requestHeaderClass.newInstance();
-            requestHeader.setCode(requestCode);
             requestHeader.setOneway(oneway);
             requestHeader.setBname(destBrokerName);
             return requestHeader;
@@ -53,7 +52,6 @@ public class RequestBuilder {
         }
         try {
             TopicQueueRequestHeader requestHeader = (TopicQueueRequestHeader) 
requestHeaderClass.newInstance();
-            requestHeader.setCode(requestCode);
             requestHeader.setOneway(oneway);
             requestHeader.setBname(destBrokerName);
             requestHeader.setTopic(topic);
diff --git 
a/common/src/main/java/org/apache/rocketmq/common/rpc/RpcClientImpl.java 
b/common/src/main/java/org/apache/rocketmq/common/rpc/RpcClientImpl.java
index 8d557a1..245cc3a 100644
--- a/common/src/main/java/org/apache/rocketmq/common/rpc/RpcClientImpl.java
+++ b/common/src/main/java/org/apache/rocketmq/common/rpc/RpcClientImpl.java
@@ -61,12 +61,12 @@ public class RpcClientImpl implements RpcClient {
         String addr = 
getBrokerAddrByNameOrException(request.getHeader().bname);
         Promise<RpcResponse> rpcResponsePromise = null;
         try {
-            switch (request.getHeader().getCode()) {
+            switch (request.getCode()) {
                 case RequestCode.PULL_MESSAGE:
                     rpcResponsePromise = handlePullMessage(addr, request, 
timeoutMs);
                     break;
                 default:
-                    throw new 
RpcException(ResponseCode.REQUEST_CODE_NOT_SUPPORTED, "Unknown request code " + 
request.getHeader().getCode());
+                    throw new 
RpcException(ResponseCode.REQUEST_CODE_NOT_SUPPORTED, "Unknown request code " + 
request.getCode());
             }
         } catch (RpcException rpcException) {
             throw rpcException;
@@ -128,8 +128,7 @@ public class RpcClientImpl implements RpcClient {
                         case ResponseCode.PULL_OFFSET_MOVED:
                             PullMessageResponseHeader responseHeader =
                                     (PullMessageResponseHeader) 
responseCommand.decodeCommandCustomHeader(PullMessageResponseHeader.class);
-                            responseHeader.setCode(responseCommand.getCode());
-                            rpcResponsePromise.setSuccess(new 
RpcResponse(responseHeader, responseCommand.getBody()));
+                            rpcResponsePromise.setSuccess(new 
RpcResponse(responseCommand.getCode(), responseHeader, 
responseCommand.getBody()));
                         default:
                             RpcResponse rpcResponse = new RpcResponse(new 
RpcException(responseCommand.getCode(), "unexpected remote response code"));
                             rpcResponsePromise.setSuccess(rpcResponse);
@@ -156,8 +155,7 @@ public class RpcClientImpl implements RpcClient {
             case ResponseCode.SUCCESS: {
                 SearchOffsetResponseHeader responseHeader =
                         (SearchOffsetResponseHeader) 
responseCommand.decodeCommandCustomHeader(SearchOffsetResponseHeader.class);
-                responseHeader.setCode(responseCommand.getCode());
-                return new RpcResponse(responseHeader, 
responseCommand.getBody());
+                return new RpcResponse(responseCommand.getCode(), 
responseHeader, responseCommand.getBody());
             }
             default:{
                 RpcResponse rpcResponse = new RpcResponse(new 
RpcException(responseCommand.getCode(), "unknown remote error"));
@@ -177,8 +175,7 @@ public class RpcClientImpl implements RpcClient {
             case ResponseCode.SUCCESS: {
                 GetMinOffsetResponseHeader responseHeader =
                         (GetMinOffsetResponseHeader) 
responseCommand.decodeCommandCustomHeader(GetMinOffsetResponseHeader.class);
-                responseHeader.setCode(responseCommand.getCode());
-                return new RpcResponse(responseHeader, 
responseCommand.getBody());
+                return new RpcResponse(responseCommand.getCode(), 
responseHeader, responseCommand.getBody());
             }
             default:{
                 RpcResponse rpcResponse = new RpcResponse(new 
RpcException(responseCommand.getCode(), "unknown remote error"));
@@ -198,8 +195,7 @@ public class RpcClientImpl implements RpcClient {
             case ResponseCode.SUCCESS: {
                 GetEarliestMsgStoretimeResponseHeader responseHeader =
                         (GetEarliestMsgStoretimeResponseHeader) 
responseCommand.decodeCommandCustomHeader(GetEarliestMsgStoretimeResponseHeader.class);
-                responseHeader.setCode(responseCommand.getCode());
-                return new RpcResponse(responseHeader, 
responseCommand.getBody());
+                return new RpcResponse(responseCommand.getCode(), 
responseHeader, responseCommand.getBody());
 
             }
             default:{
diff --git 
a/common/src/main/java/org/apache/rocketmq/common/rpc/RpcClientUtils.java 
b/common/src/main/java/org/apache/rocketmq/common/rpc/RpcClientUtils.java
index df14c22..ebae12b 100644
--- a/common/src/main/java/org/apache/rocketmq/common/rpc/RpcClientUtils.java
+++ b/common/src/main/java/org/apache/rocketmq/common/rpc/RpcClientUtils.java
@@ -8,13 +8,13 @@ import java.nio.ByteBuffer;
 public class RpcClientUtils {
 
     public static RemotingCommand createCommandForRpcRequest(RpcRequest 
rpcRequest) {
-        RemotingCommand cmd = 
RemotingCommand.createRequestCommand(rpcRequest.getHeader().getCode(), 
rpcRequest.getHeader());
+        RemotingCommand cmd = 
RemotingCommand.createRequestCommand(rpcRequest.getCode(), 
rpcRequest.getHeader());
         cmd.setBody(encodeBody(rpcRequest.getBody()));
         return cmd;
     }
 
     public static RemotingCommand createCommandForRpcResponse(RpcResponse 
rpcResponse) {
-        RemotingCommand cmd = 
RemotingCommand.createResponseCommandWithHeader(rpcResponse.getHeader().getCode(),
 rpcResponse.getHeader());
+        RemotingCommand cmd = 
RemotingCommand.createResponseCommandWithHeader(rpcResponse.getCode(), 
rpcResponse.getHeader());
         cmd.setRemark(rpcResponse.getException() == null ? "" : 
rpcResponse.getException().getMessage());
         cmd.setBody(encodeBody(rpcResponse.getBody()));
         return cmd;
diff --git a/common/src/main/java/org/apache/rocketmq/common/rpc/RpcHeader.java 
b/common/src/main/java/org/apache/rocketmq/common/rpc/RpcHeader.java
deleted file mode 100644
index 18b64d5..0000000
--- a/common/src/main/java/org/apache/rocketmq/common/rpc/RpcHeader.java
+++ /dev/null
@@ -1,46 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.rocketmq.common.rpc;
-
-import org.apache.rocketmq.remoting.CommandCustomHeader;
-import org.apache.rocketmq.remoting.exception.RemotingCommandException;
-
-public class RpcHeader implements CommandCustomHeader {
-
-
-    protected int code;
-
-    public RpcHeader() {
-    }
-
-    public RpcHeader(int code) {
-        this.code = code;
-    }
-
-    public int getCode() {
-        return code;
-    }
-
-    public void setCode(int code) {
-        this.code = code;
-    }
-
-    @Override
-    public void checkFields() throws RemotingCommandException {
-
-    }
-}
diff --git 
a/common/src/main/java/org/apache/rocketmq/common/rpc/RpcRequest.java 
b/common/src/main/java/org/apache/rocketmq/common/rpc/RpcRequest.java
index 3ebe3fe..90bb696 100644
--- a/common/src/main/java/org/apache/rocketmq/common/rpc/RpcRequest.java
+++ b/common/src/main/java/org/apache/rocketmq/common/rpc/RpcRequest.java
@@ -17,10 +17,12 @@
 package org.apache.rocketmq.common.rpc;
 
 public class RpcRequest {
+    int code;
     private RpcRequestHeader header;
     private Object body;
 
-    public RpcRequest(RpcRequestHeader header, Object body) {
+    public RpcRequest(int code, RpcRequestHeader header, Object body) {
+        this.code = code;
         this.header = header;
         this.body = body;
     }
@@ -32,4 +34,8 @@ public class RpcRequest {
     public Object getBody() {
         return body;
     }
+
+    public int getCode() {
+        return code;
+    }
 }
diff --git 
a/common/src/main/java/org/apache/rocketmq/common/rpc/RpcRequestHeader.java 
b/common/src/main/java/org/apache/rocketmq/common/rpc/RpcRequestHeader.java
index c5c748d..577865e 100644
--- a/common/src/main/java/org/apache/rocketmq/common/rpc/RpcRequestHeader.java
+++ b/common/src/main/java/org/apache/rocketmq/common/rpc/RpcRequestHeader.java
@@ -16,7 +16,9 @@
  */
 package org.apache.rocketmq.common.rpc;
 
-public abstract class RpcRequestHeader extends RpcHeader {
+import org.apache.rocketmq.remoting.CommandCustomHeader;
+
+public abstract class RpcRequestHeader implements CommandCustomHeader {
     //the namespace name
     protected String namespace;
     //if the data has been namespaced
diff --git 
a/common/src/main/java/org/apache/rocketmq/common/rpc/RpcResponse.java 
b/common/src/main/java/org/apache/rocketmq/common/rpc/RpcResponse.java
index be7cf9b..5fcde36 100644
--- a/common/src/main/java/org/apache/rocketmq/common/rpc/RpcResponse.java
+++ b/common/src/main/java/org/apache/rocketmq/common/rpc/RpcResponse.java
@@ -16,8 +16,11 @@
  */
 package org.apache.rocketmq.common.rpc;
 
+import org.apache.rocketmq.remoting.CommandCustomHeader;
+
 public class RpcResponse   {
-    private RpcHeader header;
+    private int code;
+    private CommandCustomHeader header;
     private Object body;
     public RpcException exception;
 
@@ -25,21 +28,26 @@ public class RpcResponse   {
 
     }
 
-    public RpcResponse(RpcHeader header, byte[] body) {
+    public RpcResponse(int code, CommandCustomHeader header, byte[] body) {
+        this.code = code;
         this.header = header;
         this.body = body;
     }
 
-    public RpcResponse(RpcException rpcException) {
-        this.header = new RpcHeader(rpcException.getErrorCode());
+    RpcResponse(RpcException rpcException) {
+        this.code = rpcException.getErrorCode();
         this.exception = rpcException;
     }
 
-    public RpcHeader getHeader() {
+    public int getCode() {
+        return code;
+    }
+
+    public CommandCustomHeader getHeader() {
         return header;
     }
 
-    public void setHeader(RpcHeader header) {
+    public void setHeader(CommandCustomHeader header) {
         this.header = header;
     }
 

Reply via email to