This is an automated email from the ASF dual-hosted git repository.
jinrongtong pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git
The following commit(s) were added to refs/heads/develop by this push:
new 5ac91199f [ISSUE #6380] Optimize switch cases in BrokerOuterAPI.
(#6381)
5ac91199f is described below
commit 5ac91199f2c64be9f221dec7c25acb5e84cf91cd
Author: Ji Juntao <[email protected]>
AuthorDate: Fri Mar 17 15:56:36 2023 +0800
[ISSUE #6380] Optimize switch cases in BrokerOuterAPI. (#6381)
* [ISSUE #6380] Optimize switch cases in BrokerOuterAPI.
* optimize the logic.
---
.../apache/rocketmq/broker/out/BrokerOuterAPI.java | 107 ++++++++-------------
1 file changed, 39 insertions(+), 68 deletions(-)
diff --git
a/broker/src/main/java/org/apache/rocketmq/broker/out/BrokerOuterAPI.java
b/broker/src/main/java/org/apache/rocketmq/broker/out/BrokerOuterAPI.java
index 144f05016..6c166331e 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/out/BrokerOuterAPI.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/out/BrokerOuterAPI.java
@@ -135,11 +135,7 @@ import org.apache.rocketmq.store.timer.TimerCheckpoint;
import org.apache.rocketmq.store.timer.TimerMetrics;
import static
org.apache.rocketmq.remoting.protocol.RemotingSysResponseCode.SUCCESS;
-import static
org.apache.rocketmq.remoting.protocol.ResponseCode.CONTROLLER_BROKER_METADATA_NOT_EXIST;
-import static
org.apache.rocketmq.remoting.protocol.ResponseCode.CONTROLLER_BROKER_NEED_TO_BE_REGISTERED;
-import static
org.apache.rocketmq.remoting.protocol.ResponseCode.CONTROLLER_ELECT_MASTER_FAILED;
import static
org.apache.rocketmq.remoting.protocol.ResponseCode.CONTROLLER_MASTER_STILL_EXIST;
-import static
org.apache.rocketmq.remoting.protocol.ResponseCode.CONTROLLER_NOT_LEADER;
public class BrokerOuterAPI {
private static final Logger LOGGER =
LoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME);
@@ -611,9 +607,6 @@ public class BrokerOuterAPI {
case ResponseCode.SUCCESS: {
return;
}
- case ResponseCode.SYSTEM_ERROR: {
- throw new MQBrokerException(response.getCode(),
response.getRemark(), brokerAddr);
- }
default:
break;
}
@@ -981,65 +974,57 @@ public class BrokerOuterAPI {
final Message msg,
final RemotingCommand response
) throws MQBrokerException, RemotingCommandException {
+ SendStatus sendStatus = null;
switch (response.getCode()) {
case ResponseCode.FLUSH_DISK_TIMEOUT:
+ sendStatus = SendStatus.FLUSH_DISK_TIMEOUT;
+ break;
case ResponseCode.FLUSH_SLAVE_TIMEOUT:
+ sendStatus = SendStatus.FLUSH_SLAVE_TIMEOUT;
+ break;
case ResponseCode.SLAVE_NOT_AVAILABLE:
+ sendStatus = SendStatus.SLAVE_NOT_AVAILABLE;
+ break;
case ResponseCode.SUCCESS: {
- SendStatus sendStatus = SendStatus.SEND_OK;
- switch (response.getCode()) {
- case ResponseCode.FLUSH_DISK_TIMEOUT:
- sendStatus = SendStatus.FLUSH_DISK_TIMEOUT;
- break;
- case ResponseCode.FLUSH_SLAVE_TIMEOUT:
- sendStatus = SendStatus.FLUSH_SLAVE_TIMEOUT;
- break;
- case ResponseCode.SLAVE_NOT_AVAILABLE:
- sendStatus = SendStatus.SLAVE_NOT_AVAILABLE;
- break;
- case ResponseCode.SUCCESS:
- sendStatus = SendStatus.SEND_OK;
- break;
- default:
- assert false;
- break;
- }
-
- SendMessageResponseHeader responseHeader =
+ sendStatus = SendStatus.SEND_OK;
+ break;
+ }
+ default:
+ break;
+ }
+ if (sendStatus != null) {
+ SendMessageResponseHeader responseHeader =
(SendMessageResponseHeader)
response.decodeCommandCustomHeader(SendMessageResponseHeader.class);
- //If namespace not null , reset Topic without namespace.
- String topic = msg.getTopic();
+ //If namespace not null , reset Topic without namespace.
+ String topic = msg.getTopic();
- MessageQueue messageQueue = new MessageQueue(topic,
brokerName, responseHeader.getQueueId());
+ MessageQueue messageQueue = new MessageQueue(topic, brokerName,
responseHeader.getQueueId());
- String uniqMsgId = MessageClientIDSetter.getUniqID(msg);
- if (msg instanceof MessageBatch) {
- StringBuilder sb = new StringBuilder();
- for (Message message : (MessageBatch) msg) {
- sb.append(sb.length() == 0 ? "" :
",").append(MessageClientIDSetter.getUniqID(message));
- }
- uniqMsgId = sb.toString();
+ String uniqMsgId = MessageClientIDSetter.getUniqID(msg);
+ if (msg instanceof MessageBatch) {
+ StringBuilder sb = new StringBuilder();
+ for (Message message : (MessageBatch) msg) {
+ sb.append(sb.length() == 0 ? "" :
",").append(MessageClientIDSetter.getUniqID(message));
}
- SendResult sendResult = new SendResult(sendStatus,
+ uniqMsgId = sb.toString();
+ }
+ SendResult sendResult = new SendResult(sendStatus,
uniqMsgId,
responseHeader.getMsgId(), messageQueue,
responseHeader.getQueueOffset());
- sendResult.setTransactionId(responseHeader.getTransactionId());
- String regionId =
response.getExtFields().get(MessageConst.PROPERTY_MSG_REGION);
- String traceOn =
response.getExtFields().get(MessageConst.PROPERTY_TRACE_SWITCH);
- if (regionId == null || regionId.isEmpty()) {
- regionId = MixAll.DEFAULT_TRACE_REGION_ID;
- }
- if (traceOn != null && traceOn.equals("false")) {
- sendResult.setTraceOn(false);
- } else {
- sendResult.setTraceOn(true);
- }
- sendResult.setRegionId(regionId);
- return sendResult;
+ sendResult.setTransactionId(responseHeader.getTransactionId());
+ String regionId =
response.getExtFields().get(MessageConst.PROPERTY_MSG_REGION);
+ String traceOn =
response.getExtFields().get(MessageConst.PROPERTY_TRACE_SWITCH);
+ if (regionId == null || regionId.isEmpty()) {
+ regionId = MixAll.DEFAULT_TRACE_REGION_ID;
}
- default:
- break;
+ if (traceOn != null && traceOn.equals("false")) {
+ sendResult.setTraceOn(false);
+ } else {
+ sendResult.setTraceOn(true);
+ }
+ sendResult.setRegionId(regionId);
+ return sendResult;
}
throw new MQBrokerException(response.getCode(), response.getRemark());
@@ -1163,9 +1148,6 @@ public class BrokerOuterAPI {
assert response.getBody() != null;
return RemotingSerializable.decode(response.getBody(),
SyncStateSet.class);
}
- case CONTROLLER_NOT_LEADER: {
- throw new MQBrokerException(response.getCode(), "Controller
leader was changed");
- }
}
throw new MQBrokerException(response.getCode(), response.getRemark());
}
@@ -1181,12 +1163,7 @@ public class BrokerOuterAPI {
RemotingCommand response =
this.remotingClient.invokeSync(controllerAddress, request, 3000);
assert response != null;
switch (response.getCode()) {
- case CONTROLLER_NOT_LEADER: {
- throw new MQBrokerException(response.getCode(), "Controller
leader was changed");
- }
- case CONTROLLER_BROKER_NEED_TO_BE_REGISTERED:
- case CONTROLLER_ELECT_MASTER_FAILED:
- throw new MQBrokerException(response.getCode(),
response.getRemark());
+ // Only record success response.
case CONTROLLER_MASTER_STILL_EXIST:
case SUCCESS:
final ElectMasterResponseHeader responseHeader =
(ElectMasterResponseHeader)
response.decodeCommandCustomHeader(ElectMasterResponseHeader.class);
@@ -1248,12 +1225,6 @@ public class BrokerOuterAPI {
final SyncStateSet stateSet =
RemotingSerializable.decode(response.getBody(), SyncStateSet.class);
return new Pair<>(header, stateSet);
}
- case CONTROLLER_NOT_LEADER: {
- throw new MQBrokerException(response.getCode(), "Controller
leader was changed");
- }
- case CONTROLLER_BROKER_METADATA_NOT_EXIST: {
- throw new MQBrokerException(response.getCode(),
response.getRemark());
- }
}
throw new MQBrokerException(response.getCode(), response.getRemark());
}