This is an automated email from the ASF dual-hosted git repository.
duhengforever pushed a commit to branch 5.0.0-beta
in repository https://gitbox.apache.org/repos/asf/rocketmq.git
The following commit(s) were added to refs/heads/5.0.0-beta by this push:
new d8c9bd38c [ISSUE #4270]Log parameter error and optimize code (#4271)
d8c9bd38c is described below
commit d8c9bd38cb69039cee92eb37622130ac5970ef14
Author: zhangjidi2016 <[email protected]>
AuthorDate: Sun May 15 10:45:08 2022 +0800
[ISSUE #4270]Log parameter error and optimize code (#4271)
Co-authored-by: zhangjidi2016 <[email protected]>
---
.../broker/loadbalance/MessageRequestModeManager.java | 10 +++-------
.../rocketmq/broker/processor/QueryAssignmentProcessor.java | 13 ++++++++-----
.../main/java/org/apache/rocketmq/common/BrokerConfig.java | 10 +++++-----
3 files changed, 16 insertions(+), 17 deletions(-)
diff --git
a/broker/src/main/java/org/apache/rocketmq/broker/loadbalance/MessageRequestModeManager.java
b/broker/src/main/java/org/apache/rocketmq/broker/loadbalance/MessageRequestModeManager.java
index b5472bc21..f3d382a88 100644
---
a/broker/src/main/java/org/apache/rocketmq/broker/loadbalance/MessageRequestModeManager.java
+++
b/broker/src/main/java/org/apache/rocketmq/broker/loadbalance/MessageRequestModeManager.java
@@ -23,13 +23,12 @@ import org.apache.rocketmq.common.ConfigManager;
import
org.apache.rocketmq.common.protocol.body.SetMessageRequestModeRequestBody;
import org.apache.rocketmq.remoting.protocol.RemotingSerializable;
-
public class MessageRequestModeManager extends ConfigManager {
private transient BrokerController brokerController;
- private ConcurrentHashMap<String, ConcurrentHashMap<String,
SetMessageRequestModeRequestBody>>
- messageRequestModeMap = new ConcurrentHashMap<String,
ConcurrentHashMap<String, SetMessageRequestModeRequestBody>>();
+ private ConcurrentHashMap<String/*topic*/,
ConcurrentHashMap<String/*consumerGroup*/, SetMessageRequestModeRequestBody>>
+ messageRequestModeMap = new ConcurrentHashMap<String,
ConcurrentHashMap<String, SetMessageRequestModeRequestBody>>();
public MessageRequestModeManager() {
// empty construct for decode
@@ -44,7 +43,7 @@ public class MessageRequestModeManager extends ConfigManager {
if (consumerGroup2ModeMap == null) {
consumerGroup2ModeMap = new ConcurrentHashMap<String,
SetMessageRequestModeRequestBody>();
ConcurrentHashMap<String, SetMessageRequestModeRequestBody> pre =
- messageRequestModeMap.putIfAbsent(topic,
consumerGroup2ModeMap);
+ messageRequestModeMap.putIfAbsent(topic,
consumerGroup2ModeMap);
if (pre != null) {
consumerGroup2ModeMap = pre;
}
@@ -69,19 +68,16 @@ public class MessageRequestModeManager extends
ConfigManager {
this.messageRequestModeMap = messageRequestModeMap;
}
-
@Override
public String encode() {
return this.encode(false);
}
-
@Override
public String configFilePath() {
return
BrokerPathConfigHelper.getMessageRequestModePath(this.brokerController.getMessageStoreConfig().getStorePathRootDir());
}
-
@Override
public void decode(String jsonString) {
if (jsonString != null) {
diff --git
a/broker/src/main/java/org/apache/rocketmq/broker/processor/QueryAssignmentProcessor.java
b/broker/src/main/java/org/apache/rocketmq/broker/processor/QueryAssignmentProcessor.java
index d548a356e..7173606f3 100644
---
a/broker/src/main/java/org/apache/rocketmq/broker/processor/QueryAssignmentProcessor.java
+++
b/broker/src/main/java/org/apache/rocketmq/broker/processor/QueryAssignmentProcessor.java
@@ -23,6 +23,8 @@ import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
+import org.apache.commons.collections.CollectionUtils;
+import org.apache.commons.lang3.StringUtils;
import org.apache.rocketmq.broker.BrokerController;
import org.apache.rocketmq.broker.client.ConsumerGroupInfo;
import org.apache.rocketmq.broker.loadbalance.AssignmentManager;
@@ -180,7 +182,7 @@ public class QueryAssignmentProcessor implements
NettyRequestProcessor {
return null;
}
- if
(!brokerController.getBrokerConfig().isServerLoadBalancerEnabled()) {
+ if
(!brokerController.getBrokerConfig().isServerLoadBalancerEnable()) {
return mqSet;
}
@@ -203,7 +205,7 @@ public class QueryAssignmentProcessor implements
NettyRequestProcessor {
try {
AllocateMessageQueueStrategy allocateMessageQueueStrategy
= name2LoadStrategy.get(strategyName);
if (null == allocateMessageQueueStrategy) {
- log.warn("QueryLoad: unsupported strategy [{}], {}",
consumerGroup, RemotingHelper.parseChannelRemoteAddr(ctx.channel()));
+ log.warn("QueryLoad: unsupported strategy [{}], {}",
strategyName, RemotingHelper.parseChannelRemoteAddr(ctx.channel()));
return null;
}
@@ -269,13 +271,14 @@ public class QueryAssignmentProcessor implements
NettyRequestProcessor {
private List<MessageQueue> allocate(String consumerGroup, String
currentCID, List<MessageQueue> mqAll,
List<String> cidAll) {
- if (currentCID == null || currentCID.length() < 1) {
+ if (StringUtils.isBlank(currentCID)) {
throw new IllegalArgumentException("currentCID is empty");
}
- if (mqAll == null || mqAll.isEmpty()) {
+
+ if (CollectionUtils.isEmpty(mqAll)) {
throw new IllegalArgumentException("mqAll is null or mqAll empty");
}
- if (cidAll == null || cidAll.isEmpty()) {
+ if (CollectionUtils.isEmpty(cidAll)) {
throw new IllegalArgumentException("cidAll is null or cidAll
empty");
}
diff --git a/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java
b/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java
index 71280ea0b..edb30b576 100644
--- a/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java
+++ b/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java
@@ -225,7 +225,7 @@ public class BrokerConfig extends BrokerIdentity {
*/
private int cleanOfflineBrokerInterval = 1000 * 30;
- private boolean serverLoadBalancerEnabled = true;
+ private boolean serverLoadBalancerEnable = true;
private MessageRequestMode defaultMessageRequestMode =
MessageRequestMode.PULL;
@@ -1097,12 +1097,12 @@ public class BrokerConfig extends BrokerIdentity {
this.loadBalanceProcessorThreadPoolNums =
loadBalanceProcessorThreadPoolNums;
}
- public boolean isServerLoadBalancerEnabled() {
- return serverLoadBalancerEnabled;
+ public boolean isServerLoadBalancerEnable() {
+ return serverLoadBalancerEnable;
}
- public void setServerLoadBalancerEnabled(boolean
serverLoadBalancerEnabled) {
- this.serverLoadBalancerEnabled = serverLoadBalancerEnabled;
+ public void setServerLoadBalancerEnable(boolean serverLoadBalancerEnable) {
+ this.serverLoadBalancerEnable = serverLoadBalancerEnable;
}
public MessageRequestMode getDefaultMessageRequestMode() {