This is an automated email from the ASF dual-hosted git repository.
dongeforever pushed a commit to branch 5.0.0-alpha-static-topic
in repository https://gitbox.apache.org/repos/asf/rocketmq.git
The following commit(s) were added to refs/heads/5.0.0-alpha-static-topic by
this push:
new 86c9f10 Fix code to RpcRequest
86c9f10 is described below
commit 86c9f100cfedb040470a6b0aa69e4635323be96c
Author: dongeforever <[email protected]>
AuthorDate: Tue Nov 16 21:56:07 2021 +0800
Fix code to RpcRequest
---
.../broker/processor/AdminBrokerProcessor.java | 9 ++---
.../broker/processor/PullMessageProcessor.java | 3 +-
.../GetEarliestMsgStoretimeResponseHeader.java | 3 +-
.../header/GetMinOffsetResponseHeader.java | 3 +-
.../protocol/header/PullMessageResponseHeader.java | 3 +-
.../header/SearchOffsetResponseHeader.java | 3 +-
.../apache/rocketmq/common/rpc/RequestBuilder.java | 2 -
.../apache/rocketmq/common/rpc/RpcClientImpl.java | 16 +++-----
.../apache/rocketmq/common/rpc/RpcClientUtils.java | 4 +-
.../org/apache/rocketmq/common/rpc/RpcHeader.java | 46 ----------------------
.../org/apache/rocketmq/common/rpc/RpcRequest.java | 8 +++-
.../rocketmq/common/rpc/RpcRequestHeader.java | 4 +-
.../apache/rocketmq/common/rpc/RpcResponse.java | 20 +++++++---
13 files changed, 40 insertions(+), 84 deletions(-)
diff --git
a/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java
b/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java
index 0a8f58c..3584316 100644
---
a/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java
+++
b/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java
@@ -646,9 +646,8 @@ public class AdminBrokerProcessor extends
AsyncNettyRequestProcessor implements
requestHeader.setPhysical(true);
requestHeader.setTimestamp(timestamp);
requestHeader.setQueueId(item.getQueueId());
-
requestHeader.setCode(RequestCode.SEARCH_OFFSET_BY_TIMESTAMP);
requestHeader.setBname(item.getBname());
- RpcRequest rpcRequest = new RpcRequest(requestHeader,
null);
+ RpcRequest rpcRequest = new
RpcRequest(RequestCode.SEARCH_OFFSET_BY_TIMESTAMP, requestHeader, null);
RpcResponse rpcResponse =
this.brokerController.getBrokerOuterAPI().getRpcClient().invoke(rpcRequest,
this.brokerController.getBrokerConfig().getForwardTimeout()).get();
if (rpcResponse.getException() != null) {
throw rpcResponse.getException();
@@ -753,11 +752,10 @@ public class AdminBrokerProcessor extends
AsyncNettyRequestProcessor implements
return buildErrorResponse(ResponseCode.NOT_LEADER_FOR_QUEUE,
String.format("%s-%d does not exit in request process of current broker %s",
mappingContext.getTopic(), mappingContext.getGlobalId(),
mappingDetail.getBname()));
};
try {
- requestHeader.setCode(RequestCode.GET_MIN_OFFSET);
requestHeader.setBname(mappingItem.getBname());
requestHeader.setPhysical(true);
//TODO check if it is leader
- RpcRequest rpcRequest = new RpcRequest(requestHeader, null);
+ RpcRequest rpcRequest = new RpcRequest(RequestCode.GET_MIN_OFFSET,
requestHeader, null);
RpcResponse rpcResponse =
this.brokerController.getBrokerOuterAPI().getRpcClient().invoke(rpcRequest,
this.brokerController.getBrokerConfig().getForwardTimeout()).get();
if (rpcResponse.getException() != null) {
throw rpcResponse.getException();
@@ -808,10 +806,9 @@ public class AdminBrokerProcessor extends
AsyncNettyRequestProcessor implements
return buildErrorResponse(ResponseCode.NOT_LEADER_FOR_QUEUE,
String.format("%s-%d does not exit in request process of current broker %s",
mappingContext.getTopic(), mappingContext.getGlobalId(),
mappingDetail.getBname()));
};
try {
- requestHeader.setCode(RequestCode.GET_MIN_OFFSET);
requestHeader.setBname(mappingItem.getBname());
requestHeader.setPhysical(true);
- RpcRequest rpcRequest = new RpcRequest(requestHeader, null);
+ RpcRequest rpcRequest = new RpcRequest(RequestCode.GET_MIN_OFFSET,
requestHeader, null);
//TODO check if it is leader
RpcResponse rpcResponse =
this.brokerController.getBrokerOuterAPI().getRpcClient().invoke(rpcRequest,
this.brokerController.getBrokerConfig().getForwardTimeout()).get();
if (rpcResponse.getException() != null) {
diff --git
a/broker/src/main/java/org/apache/rocketmq/broker/processor/PullMessageProcessor.java
b/broker/src/main/java/org/apache/rocketmq/broker/processor/PullMessageProcessor.java
index 0cd4668..df89889 100644
---
a/broker/src/main/java/org/apache/rocketmq/broker/processor/PullMessageProcessor.java
+++
b/broker/src/main/java/org/apache/rocketmq/broker/processor/PullMessageProcessor.java
@@ -145,8 +145,7 @@ public class PullMessageProcessor extends
AsyncNettyRequestProcessor implements
requestHeader.setPhysical(true);
requestHeader.setBname(bname);
- requestHeader.setCode(RequestCode.PULL_MESSAGE);
- RpcRequest rpcRequest = new RpcRequest(requestHeader, null);
+ RpcRequest rpcRequest = new RpcRequest(RequestCode.PULL_MESSAGE,
requestHeader, null);
RpcResponse rpcResponse =
this.brokerController.getBrokerOuterAPI().getRpcClient().invoke(rpcRequest,
this.brokerController.getBrokerConfig().getForwardTimeout()).get();
if (rpcResponse.getException() != null) {
throw rpcResponse.getException();
diff --git
a/common/src/main/java/org/apache/rocketmq/common/protocol/header/GetEarliestMsgStoretimeResponseHeader.java
b/common/src/main/java/org/apache/rocketmq/common/protocol/header/GetEarliestMsgStoretimeResponseHeader.java
index 10e7d82..6b9b3b2 100644
---
a/common/src/main/java/org/apache/rocketmq/common/protocol/header/GetEarliestMsgStoretimeResponseHeader.java
+++
b/common/src/main/java/org/apache/rocketmq/common/protocol/header/GetEarliestMsgStoretimeResponseHeader.java
@@ -20,12 +20,11 @@
*/
package org.apache.rocketmq.common.protocol.header;
-import org.apache.rocketmq.common.rpc.RpcHeader;
import org.apache.rocketmq.remoting.CommandCustomHeader;
import org.apache.rocketmq.remoting.annotation.CFNotNull;
import org.apache.rocketmq.remoting.exception.RemotingCommandException;
-public class GetEarliestMsgStoretimeResponseHeader extends RpcHeader {
+public class GetEarliestMsgStoretimeResponseHeader implements
CommandCustomHeader {
@CFNotNull
private Long timestamp;
diff --git
a/common/src/main/java/org/apache/rocketmq/common/protocol/header/GetMinOffsetResponseHeader.java
b/common/src/main/java/org/apache/rocketmq/common/protocol/header/GetMinOffsetResponseHeader.java
index 57cb050..6fc0fac 100644
---
a/common/src/main/java/org/apache/rocketmq/common/protocol/header/GetMinOffsetResponseHeader.java
+++
b/common/src/main/java/org/apache/rocketmq/common/protocol/header/GetMinOffsetResponseHeader.java
@@ -20,12 +20,11 @@
*/
package org.apache.rocketmq.common.protocol.header;
-import org.apache.rocketmq.common.rpc.RpcHeader;
import org.apache.rocketmq.remoting.CommandCustomHeader;
import org.apache.rocketmq.remoting.annotation.CFNotNull;
import org.apache.rocketmq.remoting.exception.RemotingCommandException;
-public class GetMinOffsetResponseHeader extends RpcHeader {
+public class GetMinOffsetResponseHeader implements CommandCustomHeader {
@CFNotNull
private Long offset;
diff --git
a/common/src/main/java/org/apache/rocketmq/common/protocol/header/PullMessageResponseHeader.java
b/common/src/main/java/org/apache/rocketmq/common/protocol/header/PullMessageResponseHeader.java
index e2f896b..88af984 100644
---
a/common/src/main/java/org/apache/rocketmq/common/protocol/header/PullMessageResponseHeader.java
+++
b/common/src/main/java/org/apache/rocketmq/common/protocol/header/PullMessageResponseHeader.java
@@ -20,13 +20,12 @@
*/
package org.apache.rocketmq.common.protocol.header;
-import org.apache.rocketmq.common.rpc.RpcHeader;
import org.apache.rocketmq.remoting.CommandCustomHeader;
import org.apache.rocketmq.remoting.annotation.CFNotNull;
import org.apache.rocketmq.remoting.annotation.CFNullable;
import org.apache.rocketmq.remoting.exception.RemotingCommandException;
-public class PullMessageResponseHeader extends RpcHeader {
+public class PullMessageResponseHeader implements CommandCustomHeader {
@CFNotNull
private Long suggestWhichBrokerId;
@CFNotNull
diff --git
a/common/src/main/java/org/apache/rocketmq/common/protocol/header/SearchOffsetResponseHeader.java
b/common/src/main/java/org/apache/rocketmq/common/protocol/header/SearchOffsetResponseHeader.java
index 416f7f7..f88ac68 100644
---
a/common/src/main/java/org/apache/rocketmq/common/protocol/header/SearchOffsetResponseHeader.java
+++
b/common/src/main/java/org/apache/rocketmq/common/protocol/header/SearchOffsetResponseHeader.java
@@ -20,12 +20,11 @@
*/
package org.apache.rocketmq.common.protocol.header;
-import org.apache.rocketmq.common.rpc.RpcHeader;
import org.apache.rocketmq.remoting.CommandCustomHeader;
import org.apache.rocketmq.remoting.annotation.CFNotNull;
import org.apache.rocketmq.remoting.exception.RemotingCommandException;
-public class SearchOffsetResponseHeader extends RpcHeader {
+public class SearchOffsetResponseHeader implements CommandCustomHeader {
@CFNotNull
private Long offset;
diff --git
a/common/src/main/java/org/apache/rocketmq/common/rpc/RequestBuilder.java
b/common/src/main/java/org/apache/rocketmq/common/rpc/RequestBuilder.java
index a791543..4b5c62b 100644
--- a/common/src/main/java/org/apache/rocketmq/common/rpc/RequestBuilder.java
+++ b/common/src/main/java/org/apache/rocketmq/common/rpc/RequestBuilder.java
@@ -25,7 +25,6 @@ public class RequestBuilder {
}
try {
RpcRequestHeader requestHeader = (RpcRequestHeader)
requestHeaderClass.newInstance();
- requestHeader.setCode(requestCode);
requestHeader.setOneway(oneway);
requestHeader.setBname(destBrokerName);
return requestHeader;
@@ -53,7 +52,6 @@ public class RequestBuilder {
}
try {
TopicQueueRequestHeader requestHeader = (TopicQueueRequestHeader)
requestHeaderClass.newInstance();
- requestHeader.setCode(requestCode);
requestHeader.setOneway(oneway);
requestHeader.setBname(destBrokerName);
requestHeader.setTopic(topic);
diff --git
a/common/src/main/java/org/apache/rocketmq/common/rpc/RpcClientImpl.java
b/common/src/main/java/org/apache/rocketmq/common/rpc/RpcClientImpl.java
index 8d557a1..245cc3a 100644
--- a/common/src/main/java/org/apache/rocketmq/common/rpc/RpcClientImpl.java
+++ b/common/src/main/java/org/apache/rocketmq/common/rpc/RpcClientImpl.java
@@ -61,12 +61,12 @@ public class RpcClientImpl implements RpcClient {
String addr =
getBrokerAddrByNameOrException(request.getHeader().bname);
Promise<RpcResponse> rpcResponsePromise = null;
try {
- switch (request.getHeader().getCode()) {
+ switch (request.getCode()) {
case RequestCode.PULL_MESSAGE:
rpcResponsePromise = handlePullMessage(addr, request,
timeoutMs);
break;
default:
- throw new
RpcException(ResponseCode.REQUEST_CODE_NOT_SUPPORTED, "Unknown request code " +
request.getHeader().getCode());
+ throw new
RpcException(ResponseCode.REQUEST_CODE_NOT_SUPPORTED, "Unknown request code " +
request.getCode());
}
} catch (RpcException rpcException) {
throw rpcException;
@@ -128,8 +128,7 @@ public class RpcClientImpl implements RpcClient {
case ResponseCode.PULL_OFFSET_MOVED:
PullMessageResponseHeader responseHeader =
(PullMessageResponseHeader)
responseCommand.decodeCommandCustomHeader(PullMessageResponseHeader.class);
- responseHeader.setCode(responseCommand.getCode());
- rpcResponsePromise.setSuccess(new
RpcResponse(responseHeader, responseCommand.getBody()));
+ rpcResponsePromise.setSuccess(new
RpcResponse(responseCommand.getCode(), responseHeader,
responseCommand.getBody()));
default:
RpcResponse rpcResponse = new RpcResponse(new
RpcException(responseCommand.getCode(), "unexpected remote response code"));
rpcResponsePromise.setSuccess(rpcResponse);
@@ -156,8 +155,7 @@ public class RpcClientImpl implements RpcClient {
case ResponseCode.SUCCESS: {
SearchOffsetResponseHeader responseHeader =
(SearchOffsetResponseHeader)
responseCommand.decodeCommandCustomHeader(SearchOffsetResponseHeader.class);
- responseHeader.setCode(responseCommand.getCode());
- return new RpcResponse(responseHeader,
responseCommand.getBody());
+ return new RpcResponse(responseCommand.getCode(),
responseHeader, responseCommand.getBody());
}
default:{
RpcResponse rpcResponse = new RpcResponse(new
RpcException(responseCommand.getCode(), "unknown remote error"));
@@ -177,8 +175,7 @@ public class RpcClientImpl implements RpcClient {
case ResponseCode.SUCCESS: {
GetMinOffsetResponseHeader responseHeader =
(GetMinOffsetResponseHeader)
responseCommand.decodeCommandCustomHeader(GetMinOffsetResponseHeader.class);
- responseHeader.setCode(responseCommand.getCode());
- return new RpcResponse(responseHeader,
responseCommand.getBody());
+ return new RpcResponse(responseCommand.getCode(),
responseHeader, responseCommand.getBody());
}
default:{
RpcResponse rpcResponse = new RpcResponse(new
RpcException(responseCommand.getCode(), "unknown remote error"));
@@ -198,8 +195,7 @@ public class RpcClientImpl implements RpcClient {
case ResponseCode.SUCCESS: {
GetEarliestMsgStoretimeResponseHeader responseHeader =
(GetEarliestMsgStoretimeResponseHeader)
responseCommand.decodeCommandCustomHeader(GetEarliestMsgStoretimeResponseHeader.class);
- responseHeader.setCode(responseCommand.getCode());
- return new RpcResponse(responseHeader,
responseCommand.getBody());
+ return new RpcResponse(responseCommand.getCode(),
responseHeader, responseCommand.getBody());
}
default:{
diff --git
a/common/src/main/java/org/apache/rocketmq/common/rpc/RpcClientUtils.java
b/common/src/main/java/org/apache/rocketmq/common/rpc/RpcClientUtils.java
index df14c22..ebae12b 100644
--- a/common/src/main/java/org/apache/rocketmq/common/rpc/RpcClientUtils.java
+++ b/common/src/main/java/org/apache/rocketmq/common/rpc/RpcClientUtils.java
@@ -8,13 +8,13 @@ import java.nio.ByteBuffer;
public class RpcClientUtils {
public static RemotingCommand createCommandForRpcRequest(RpcRequest
rpcRequest) {
- RemotingCommand cmd =
RemotingCommand.createRequestCommand(rpcRequest.getHeader().getCode(),
rpcRequest.getHeader());
+ RemotingCommand cmd =
RemotingCommand.createRequestCommand(rpcRequest.getCode(),
rpcRequest.getHeader());
cmd.setBody(encodeBody(rpcRequest.getBody()));
return cmd;
}
public static RemotingCommand createCommandForRpcResponse(RpcResponse
rpcResponse) {
- RemotingCommand cmd =
RemotingCommand.createResponseCommandWithHeader(rpcResponse.getHeader().getCode(),
rpcResponse.getHeader());
+ RemotingCommand cmd =
RemotingCommand.createResponseCommandWithHeader(rpcResponse.getCode(),
rpcResponse.getHeader());
cmd.setRemark(rpcResponse.getException() == null ? "" :
rpcResponse.getException().getMessage());
cmd.setBody(encodeBody(rpcResponse.getBody()));
return cmd;
diff --git a/common/src/main/java/org/apache/rocketmq/common/rpc/RpcHeader.java
b/common/src/main/java/org/apache/rocketmq/common/rpc/RpcHeader.java
deleted file mode 100644
index 18b64d5..0000000
--- a/common/src/main/java/org/apache/rocketmq/common/rpc/RpcHeader.java
+++ /dev/null
@@ -1,46 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.rocketmq.common.rpc;
-
-import org.apache.rocketmq.remoting.CommandCustomHeader;
-import org.apache.rocketmq.remoting.exception.RemotingCommandException;
-
-public class RpcHeader implements CommandCustomHeader {
-
-
- protected int code;
-
- public RpcHeader() {
- }
-
- public RpcHeader(int code) {
- this.code = code;
- }
-
- public int getCode() {
- return code;
- }
-
- public void setCode(int code) {
- this.code = code;
- }
-
- @Override
- public void checkFields() throws RemotingCommandException {
-
- }
-}
diff --git
a/common/src/main/java/org/apache/rocketmq/common/rpc/RpcRequest.java
b/common/src/main/java/org/apache/rocketmq/common/rpc/RpcRequest.java
index 3ebe3fe..90bb696 100644
--- a/common/src/main/java/org/apache/rocketmq/common/rpc/RpcRequest.java
+++ b/common/src/main/java/org/apache/rocketmq/common/rpc/RpcRequest.java
@@ -17,10 +17,12 @@
package org.apache.rocketmq.common.rpc;
public class RpcRequest {
+ int code;
private RpcRequestHeader header;
private Object body;
- public RpcRequest(RpcRequestHeader header, Object body) {
+ public RpcRequest(int code, RpcRequestHeader header, Object body) {
+ this.code = code;
this.header = header;
this.body = body;
}
@@ -32,4 +34,8 @@ public class RpcRequest {
public Object getBody() {
return body;
}
+
+ public int getCode() {
+ return code;
+ }
}
diff --git
a/common/src/main/java/org/apache/rocketmq/common/rpc/RpcRequestHeader.java
b/common/src/main/java/org/apache/rocketmq/common/rpc/RpcRequestHeader.java
index c5c748d..577865e 100644
--- a/common/src/main/java/org/apache/rocketmq/common/rpc/RpcRequestHeader.java
+++ b/common/src/main/java/org/apache/rocketmq/common/rpc/RpcRequestHeader.java
@@ -16,7 +16,9 @@
*/
package org.apache.rocketmq.common.rpc;
-public abstract class RpcRequestHeader extends RpcHeader {
+import org.apache.rocketmq.remoting.CommandCustomHeader;
+
+public abstract class RpcRequestHeader implements CommandCustomHeader {
//the namespace name
protected String namespace;
//if the data has been namespaced
diff --git
a/common/src/main/java/org/apache/rocketmq/common/rpc/RpcResponse.java
b/common/src/main/java/org/apache/rocketmq/common/rpc/RpcResponse.java
index be7cf9b..5fcde36 100644
--- a/common/src/main/java/org/apache/rocketmq/common/rpc/RpcResponse.java
+++ b/common/src/main/java/org/apache/rocketmq/common/rpc/RpcResponse.java
@@ -16,8 +16,11 @@
*/
package org.apache.rocketmq.common.rpc;
+import org.apache.rocketmq.remoting.CommandCustomHeader;
+
public class RpcResponse {
- private RpcHeader header;
+ private int code;
+ private CommandCustomHeader header;
private Object body;
public RpcException exception;
@@ -25,21 +28,26 @@ public class RpcResponse {
}
- public RpcResponse(RpcHeader header, byte[] body) {
+ public RpcResponse(int code, CommandCustomHeader header, byte[] body) {
+ this.code = code;
this.header = header;
this.body = body;
}
- public RpcResponse(RpcException rpcException) {
- this.header = new RpcHeader(rpcException.getErrorCode());
+ RpcResponse(RpcException rpcException) {
+ this.code = rpcException.getErrorCode();
this.exception = rpcException;
}
- public RpcHeader getHeader() {
+ public int getCode() {
+ return code;
+ }
+
+ public CommandCustomHeader getHeader() {
return header;
}
- public void setHeader(RpcHeader header) {
+ public void setHeader(CommandCustomHeader header) {
this.header = header;
}