ROCKETMQ-18 Clean code closes apache/incubator-rocketmq#21
Project: http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/commit/167cce03 Tree: http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/tree/167cce03 Diff: http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/diff/167cce03 Branch: refs/heads/master Commit: 167cce03480eb3abc05048ebbb023cbd6d243b62 Parents: 6672256 Author: dongeforever <[email protected]> Authored: Thu Dec 29 15:25:05 2016 +0800 Committer: lollipop <[email protected]> Committed: Thu Dec 29 15:25:05 2016 +0800 ---------------------------------------------------------------------- .../rocketmq/broker/BrokerController.java | 2 +- .../apache/rocketmq/broker/BrokerStartup.java | 10 ++--- .../broker/client/ConsumerGroupInfo.java | 4 +- .../rocketmq/broker/client/ConsumerManager.java | 2 +- .../rocketmq/broker/client/ProducerManager.java | 2 +- .../broker/client/net/Broker2Client.java | 2 +- .../client/rebalance/RebalanceLockManager.java | 4 +- .../broker/filtersrv/FilterServerManager.java | 2 +- .../broker/filtersrv/FilterServerUtil.java | 4 +- .../broker/latency/BrokerFastFailure.java | 1 - .../broker/longpolling/ManyPullRequest.java | 2 +- .../longpolling/PullRequestHoldService.java | 9 ++-- .../rocketmq/broker/out/BrokerOuterAPI.java | 6 +-- .../processor/AbstractSendMessageProcessor.java | 8 ++-- .../broker/processor/AdminBrokerProcessor.java | 10 ++--- .../broker/processor/PullMessageProcessor.java | 46 +++++++++----------- .../broker/processor/QueryMessageProcessor.java | 2 +- .../broker/processor/SendMessageProcessor.java | 2 +- .../rocketmq/broker/slave/SlaveSynchronize.java | 18 ++++---- .../subscription/SubscriptionGroupManager.java | 8 ++-- .../broker/topic/TopicConfigManager.java | 19 ++++---- .../client/impl/ClientRemotingProcessor.java | 3 +- .../rocketmq/client/impl/MQClientAPIImpl.java | 21 ++++----- .../rocketmq/client/impl/MQClientManager.java | 5 ++- .../consumer/DefaultMQPullConsumerImpl.java | 10 ++--- .../consumer/DefaultMQPushConsumerImpl.java | 6 +-- .../client/impl/consumer/ProcessQueue.java | 6 +-- .../client/impl/factory/MQClientInstance.java | 21 ++++----- .../client/latency/MQFaultStrategy.java | 4 ++ .../java/org/apache/rocketmq/common/MixAll.java | 34 ++++++--------- .../org/apache/rocketmq/common/UtilAll.java | 15 ++++--- .../rocketmq/common/filter/FilterAPI.java | 2 +- .../rocketmq/common/message/MessageDecoder.java | 20 +++------ .../common/protocol/MQProtosHelper.java | 11 +---- .../common/stats/MomentStatsItemSet.java | 2 +- .../apache/rocketmq/common/stats/StatsItem.java | 6 +-- .../rocketmq/common/stats/StatsItemSet.java | 12 ++--- .../org/apache/rocketmq/common/MixAllTest.java | 5 ++- .../rocketmq/common/RemotingUtilTest.java | 3 +- .../common/protocol/ConsumeStatusTest.java | 11 +++-- .../rocketmq/example/benchmark/Producer.java | 6 +-- .../example/benchmark/TransactionProducer.java | 4 +- .../rocketmq/example/ordermessage/Producer.java | 8 +--- .../transaction/TransactionProducer.java | 4 +- .../namesrv/routeinfo/RouteInfoManager.java | 6 +-- .../rocketmq/remoting/common/RemotingUtil.java | 4 +- .../store/AllocateMappedFileService.java | 2 +- .../org/apache/rocketmq/store/CommitLog.java | 6 +-- .../rocketmq/store/DefaultMessageStore.java | 6 +-- .../org/apache/rocketmq/store/StoreUtil.java | 2 +- .../rocketmq/store/index/IndexService.java | 4 +- .../broker/BrokerConsumeStatsSubCommad.java | 2 +- .../cluster/CLusterSendMsgRTCommand.java | 3 +- .../command/cluster/ClusterListSubCommand.java | 25 ++++++----- .../DeleteSubscriptionGroupCommand.java | 1 + .../consumer/StartMonitoringSubCommand.java | 1 - .../message/PrintMessageByQueueCommand.java | 7 +-- .../command/message/PrintMessageSubCommand.java | 3 +- .../rocketmq/tools/command/message/Store.java | 2 - .../command/namesrv/DeleteKvConfigCommand.java | 1 - .../namesrv/GetNamesrvConfigCommand.java | 3 +- .../command/namesrv/UpdateKvConfigCommand.java | 1 - .../namesrv/UpdateNamesrvConfigCommand.java | 3 +- .../command/offset/CloneGroupOffsetCommand.java | 2 +- 64 files changed, 208 insertions(+), 258 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/167cce03/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java ---------------------------------------------------------------------- diff --git a/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java b/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java index 4fa3e21..9b89c85 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java @@ -497,7 +497,7 @@ public class BrokerController { long diff = this.messageStore.slaveFallBehindMuch(); // XXX: warn and notify me - log.info("slave fall behind master, how much, {} bytes", diff); + log.info("Slave fall behind master: {} bytes", diff); } public Broker2Client getBroker2Client() { http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/167cce03/broker/src/main/java/org/apache/rocketmq/broker/BrokerStartup.java ---------------------------------------------------------------------- diff --git a/broker/src/main/java/org/apache/rocketmq/broker/BrokerStartup.java b/broker/src/main/java/org/apache/rocketmq/broker/BrokerStartup.java index e5d0431..5b15d79 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/BrokerStartup.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/BrokerStartup.java @@ -148,10 +148,8 @@ public class BrokerStartup { if (null != namesrvAddr) { try { String[] addrArray = namesrvAddr.split(";"); - if (addrArray != null) { - for (String addr : addrArray) { - RemotingUtil.string2SocketAddress(addr); - } + for (String addr : addrArray) { + RemotingUtil.string2SocketAddress(addr); } } catch (Exception e) { System.out.printf( @@ -211,13 +209,13 @@ public class BrokerStartup { @Override public void run() { synchronized (this) { - log.info("shutdown hook was invoked, " + this.shutdownTimes.incrementAndGet()); + log.info("Shutdown hook was invoked, {}", this.shutdownTimes.incrementAndGet()); if (!this.hasShutdown) { this.hasShutdown = true; long begineTime = System.currentTimeMillis(); controller.shutdown(); long consumingTimeTotal = System.currentTimeMillis() - begineTime; - log.info("shutdown hook over, consuming time total(ms): " + consumingTimeTotal); + log.info("Shutdown hook over, consuming total time(ms): {}", consumingTimeTotal); } } } http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/167cce03/broker/src/main/java/org/apache/rocketmq/broker/client/ConsumerGroupInfo.java ---------------------------------------------------------------------- diff --git a/broker/src/main/java/org/apache/rocketmq/broker/client/ConsumerGroupInfo.java b/broker/src/main/java/org/apache/rocketmq/broker/client/ConsumerGroupInfo.java index 7e9c496..6ce542a 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/client/ConsumerGroupInfo.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/client/ConsumerGroupInfo.java @@ -72,7 +72,7 @@ public class ConsumerGroupInfo { } public List<Channel> getAllChannel() { - List<Channel> result = new ArrayList<Channel>(); + List<Channel> result = new ArrayList<>(); result.addAll(this.channelInfoTable.keySet()); @@ -80,7 +80,7 @@ public class ConsumerGroupInfo { } public List<String> getAllClientId() { - List<String> result = new ArrayList<String>(); + List<String> result = new ArrayList<>(); Iterator<Entry<Channel, ClientChannelInfo>> it = this.channelInfoTable.entrySet().iterator(); http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/167cce03/broker/src/main/java/org/apache/rocketmq/broker/client/ConsumerManager.java ---------------------------------------------------------------------- diff --git a/broker/src/main/java/org/apache/rocketmq/broker/client/ConsumerManager.java b/broker/src/main/java/org/apache/rocketmq/broker/client/ConsumerManager.java index 10d43b3..a2d88d5 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/client/ConsumerManager.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/client/ConsumerManager.java @@ -167,7 +167,7 @@ public class ConsumerManager { } public HashSet<String> queryTopicConsumeByWho(final String topic) { - HashSet<String> groups = new HashSet<String>(); + HashSet<String> groups = new HashSet<>(); Iterator<Entry<String, ConsumerGroupInfo>> it = this.consumerTable.entrySet().iterator(); while (it.hasNext()) { Entry<String, ConsumerGroupInfo> entry = it.next(); http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/167cce03/broker/src/main/java/org/apache/rocketmq/broker/client/ProducerManager.java ---------------------------------------------------------------------- diff --git a/broker/src/main/java/org/apache/rocketmq/broker/client/ProducerManager.java b/broker/src/main/java/org/apache/rocketmq/broker/client/ProducerManager.java index b4dc305..010c1ae 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/client/ProducerManager.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/client/ProducerManager.java @@ -133,7 +133,7 @@ public class ProducerManager { try { HashMap<Channel, ClientChannelInfo> channelTable = this.groupChannelTable.get(group); if (null == channelTable) { - channelTable = new HashMap<Channel, ClientChannelInfo>(); + channelTable = new HashMap<>(); this.groupChannelTable.put(group, channelTable); } http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/167cce03/broker/src/main/java/org/apache/rocketmq/broker/client/net/Broker2Client.java ---------------------------------------------------------------------- diff --git a/broker/src/main/java/org/apache/rocketmq/broker/client/net/Broker2Client.java b/broker/src/main/java/org/apache/rocketmq/broker/client/net/Broker2Client.java index 1fdf3db..c00898c 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/client/net/Broker2Client.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/client/net/Broker2Client.java @@ -230,7 +230,7 @@ public class Broker2Client { } private List<MessageQueueForC> convertOffsetTable2OffsetList(Map<MessageQueue, Long> table) { - List<MessageQueueForC> list = new ArrayList<MessageQueueForC>(); + List<MessageQueueForC> list = new ArrayList<>(); for (Entry<MessageQueue, Long> entry : table.entrySet()) { MessageQueue mq = entry.getKey(); MessageQueueForC tmp = http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/167cce03/broker/src/main/java/org/apache/rocketmq/broker/client/rebalance/RebalanceLockManager.java ---------------------------------------------------------------------- diff --git a/broker/src/main/java/org/apache/rocketmq/broker/client/rebalance/RebalanceLockManager.java b/broker/src/main/java/org/apache/rocketmq/broker/client/rebalance/RebalanceLockManager.java index 426fcf2..98aceb6 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/client/rebalance/RebalanceLockManager.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/client/rebalance/RebalanceLockManager.java @@ -42,7 +42,7 @@ public class RebalanceLockManager { try { ConcurrentHashMap<MessageQueue, LockEntry> groupValue = this.mqLockTable.get(group); if (null == groupValue) { - groupValue = new ConcurrentHashMap<MessageQueue, LockEntry>(32); + groupValue = new ConcurrentHashMap<>(32); this.mqLockTable.put(group, groupValue); } @@ -132,7 +132,7 @@ public class RebalanceLockManager { try { ConcurrentHashMap<MessageQueue, LockEntry> groupValue = this.mqLockTable.get(group); if (null == groupValue) { - groupValue = new ConcurrentHashMap<MessageQueue, LockEntry>(32); + groupValue = new ConcurrentHashMap<>(32); this.mqLockTable.put(group, groupValue); } http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/167cce03/broker/src/main/java/org/apache/rocketmq/broker/filtersrv/FilterServerManager.java ---------------------------------------------------------------------- diff --git a/broker/src/main/java/org/apache/rocketmq/broker/filtersrv/FilterServerManager.java b/broker/src/main/java/org/apache/rocketmq/broker/filtersrv/FilterServerManager.java index 35b6dc4..b935bc8 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/filtersrv/FilterServerManager.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/filtersrv/FilterServerManager.java @@ -137,7 +137,7 @@ public class FilterServerManager { } public List<String> buildNewFilterServerList() { - List<String> addr = new ArrayList<String>(); + List<String> addr = new ArrayList<>(); Iterator<Entry<Channel, FilterServerInfo>> it = this.filterServerTable.entrySet().iterator(); while (it.hasNext()) { Entry<Channel, FilterServerInfo> next = it.next(); http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/167cce03/broker/src/main/java/org/apache/rocketmq/broker/filtersrv/FilterServerUtil.java ---------------------------------------------------------------------- diff --git a/broker/src/main/java/org/apache/rocketmq/broker/filtersrv/FilterServerUtil.java b/broker/src/main/java/org/apache/rocketmq/broker/filtersrv/FilterServerUtil.java index 818b238..5b142c1 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/filtersrv/FilterServerUtil.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/filtersrv/FilterServerUtil.java @@ -26,9 +26,9 @@ public class FilterServerUtil { String[] cmdArray = splitShellString(shellString); process = Runtime.getRuntime().exec(cmdArray); process.waitFor(); - log.info("callShell: <{}> OK", shellString); + log.info("CallShell: <{}> OK", shellString); } catch (Throwable e) { - log.error("callShell: readLine IOException, " + shellString, e); + log.error("CallShell: readLine IOException, {}", shellString, e); } finally { if (null != process) process.destroy(); http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/167cce03/broker/src/main/java/org/apache/rocketmq/broker/latency/BrokerFastFailure.java ---------------------------------------------------------------------- diff --git a/broker/src/main/java/org/apache/rocketmq/broker/latency/BrokerFastFailure.java b/broker/src/main/java/org/apache/rocketmq/broker/latency/BrokerFastFailure.java index c004d1b..d7d1276 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/latency/BrokerFastFailure.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/latency/BrokerFastFailure.java @@ -103,7 +103,6 @@ public class BrokerFastFailure { } } } - public void shutdown() { this.scheduledExecutorService.shutdown(); } http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/167cce03/broker/src/main/java/org/apache/rocketmq/broker/longpolling/ManyPullRequest.java ---------------------------------------------------------------------- diff --git a/broker/src/main/java/org/apache/rocketmq/broker/longpolling/ManyPullRequest.java b/broker/src/main/java/org/apache/rocketmq/broker/longpolling/ManyPullRequest.java index d7c6e6e..d956c22 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/longpolling/ManyPullRequest.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/longpolling/ManyPullRequest.java @@ -20,7 +20,7 @@ import java.util.ArrayList; import java.util.List; public class ManyPullRequest { - private final ArrayList<PullRequest> pullRequestList = new ArrayList<PullRequest>(); + private final ArrayList<PullRequest> pullRequestList = new ArrayList<>(); public synchronized void addPullRequest(final PullRequest pullRequest) { this.pullRequestList.add(pullRequest); http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/167cce03/broker/src/main/java/org/apache/rocketmq/broker/longpolling/PullRequestHoldService.java ---------------------------------------------------------------------- diff --git a/broker/src/main/java/org/apache/rocketmq/broker/longpolling/PullRequestHoldService.java b/broker/src/main/java/org/apache/rocketmq/broker/longpolling/PullRequestHoldService.java index 2579652..ff068d2 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/longpolling/PullRequestHoldService.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/longpolling/PullRequestHoldService.java @@ -65,7 +65,7 @@ public class PullRequestHoldService extends ServiceThread { @Override public void run() { - log.info(this.getServiceName() + " service started"); + log.info("{} service started", this.getServiceName()); while (!this.isStopped()) { try { if (this.brokerController.getBrokerConfig().isLongPollingEnable()) { @@ -85,7 +85,7 @@ public class PullRequestHoldService extends ServiceThread { } } - log.info(this.getServiceName() + " service end"); + log.info("{} service end", this.getServiceName()); } @Override @@ -96,7 +96,7 @@ public class PullRequestHoldService extends ServiceThread { private void checkHoldRequest() { for (String key : this.pullRequestTable.keySet()) { String[] kArray = key.split(TOPIC_QUEUEID_SEPARATOR); - if (kArray != null && 2 == kArray.length) { + if (2 == kArray.length) { String topic = kArray[0]; int queueId = Integer.parseInt(kArray[1]); final long offset = this.brokerController.getMessageStore().getMaxOffsetInQuque(topic, queueId); @@ -127,9 +127,8 @@ public class PullRequestHoldService extends ServiceThread { newestOffset = this.brokerController.getMessageStore().getMaxOffsetInQuque(topic, queueId); } - Long tmp = tagsCode; if (newestOffset > request.getPullFromThisOffset()) { - if (this.messageFilter.isMessageMatched(request.getSubscriptionData(), tmp)) { + if (this.messageFilter.isMessageMatched(request.getSubscriptionData(), tagsCode)) { try { this.brokerController.getPullMessageProcessor().excuteRequestWhenWakeup(request.getClientChannel(), request.getRequestCommand()); http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/167cce03/broker/src/main/java/org/apache/rocketmq/broker/out/BrokerOuterAPI.java ---------------------------------------------------------------------- diff --git a/broker/src/main/java/org/apache/rocketmq/broker/out/BrokerOuterAPI.java b/broker/src/main/java/org/apache/rocketmq/broker/out/BrokerOuterAPI.java index 3fc4605..8726c69 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/out/BrokerOuterAPI.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/out/BrokerOuterAPI.java @@ -75,7 +75,7 @@ public class BrokerOuterAPI { String addrs = this.topAddressing.fetchNSAddr(); if (addrs != null) { if (!addrs.equals(this.nameSrvAddr)) { - log.info("name server address changed, old: " + this.nameSrvAddr + " new: " + addrs); + log.info("name server address changed, old: {} new: {}", this.nameSrvAddr, addrs); this.updateNameServerAddressList(addrs); this.nameSrvAddr = addrs; return nameSrvAddr; @@ -121,7 +121,7 @@ public class BrokerOuterAPI { log.info("register broker to name server {} OK", namesrvAddr); } catch (Exception e) { - log.warn("registerBroker Exception, " + namesrvAddr, e); + log.warn("registerBroker Exception, {}", namesrvAddr, e); } } } @@ -199,7 +199,7 @@ public class BrokerOuterAPI { this.unregisterBroker(namesrvAddr, clusterName, brokerAddr, brokerName, brokerId); log.info("unregisterBroker OK, NamesrvAddr: {}", namesrvAddr); } catch (Exception e) { - log.warn("unregisterBroker Exception, " + namesrvAddr, e); + log.warn("unregisterBroker Exception, {}", namesrvAddr, e); } } } http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/167cce03/broker/src/main/java/org/apache/rocketmq/broker/processor/AbstractSendMessageProcessor.java ---------------------------------------------------------------------- diff --git a/broker/src/main/java/org/apache/rocketmq/broker/processor/AbstractSendMessageProcessor.java b/broker/src/main/java/org/apache/rocketmq/broker/processor/AbstractSendMessageProcessor.java index fc6e132..f5dc1f9 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/processor/AbstractSendMessageProcessor.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/processor/AbstractSendMessageProcessor.java @@ -139,13 +139,12 @@ public abstract class AbstractSendMessageProcessor implements NettyRequestProces final SendMessageRequestHeader requestHeader, RemotingCommand request, final RemotingCommand response) { if (requestHeader.getTopic().length() > Byte.MAX_VALUE) { - log.warn("putMessage message topic length too long " + requestHeader.getTopic().length()); + log.warn("putMessage message topic length too long {}", requestHeader.getTopic().length()); response.setCode(ResponseCode.MESSAGE_ILLEGAL); return response; } if (requestHeader.getProperties() != null && requestHeader.getProperties().length() > Short.MAX_VALUE) { - log.warn("putMessage message properties length too long " - + requestHeader.getProperties().length()); + log.warn("putMessage message properties length too long {}", requestHeader.getProperties().length()); response.setCode(ResponseCode.MESSAGE_ILLEGAL); return response; } @@ -188,8 +187,7 @@ public abstract class AbstractSendMessageProcessor implements NettyRequestProces } } - log.warn("the topic " + requestHeader.getTopic() + " not exist, producer: " - + ctx.channel().remoteAddress()); + log.warn("the topic {} not exist, producer: {}", requestHeader.getTopic(), ctx.channel().remoteAddress()); topicConfig = this.brokerController.getTopicConfigManager().createTopicInSendMessageMethod(// requestHeader.getTopic(), // requestHeader.getDefaultTopic(), // http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/167cce03/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java ---------------------------------------------------------------------- diff --git a/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java b/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java index 286ecbe..8bf48ac 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java @@ -267,7 +267,7 @@ public class AdminBrokerProcessor implements NettyRequestProcessor { return response; } } else { - log.error("No topic in this broker, client: " + ctx.channel().remoteAddress()); + log.error("No topic in this broker, client: {}", ctx.channel().remoteAddress()); response.setCode(ResponseCode.SYSTEM_ERROR); response.setRemark("No topic in this broker"); return response; @@ -290,7 +290,7 @@ public class AdminBrokerProcessor implements NettyRequestProcessor { String bodyStr = new String(body, MixAll.DEFAULT_CHARSET); Properties properties = MixAll.string2Properties(bodyStr); if (properties != null) { - log.info("updateBrokerConfig, new config: " + properties + " client: " + ctx.channel().remoteAddress()); + log.info("updateBrokerConfig, new config: [{}] client: {} ", properties, ctx.channel().remoteAddress()); this.brokerController.getConfiguration().update(properties); if (properties.containsKey("brokerPermission")) { this.brokerController.registerBrokerAll(false, false); @@ -476,7 +476,7 @@ public class AdminBrokerProcessor implements NettyRequestProcessor { return response; } } else { - log.error("No subscription group in this broker, client: " + ctx.channel().remoteAddress()); + log.error("No subscription group in this broker, client:{} ", ctx.channel().remoteAddress()); response.setCode(ResponseCode.SYSTEM_ERROR); response.setRemark("No subscription group in this broker"); return response; @@ -718,7 +718,7 @@ public class AdminBrokerProcessor implements NettyRequestProcessor { return response; } } else { - log.error("No consumer offset in this broker, client: " + ctx.channel().remoteAddress()); + log.error("No consumer offset in this broker, client: {} ", ctx.channel().remoteAddress()); response.setCode(ResponseCode.SYSTEM_ERROR); response.setRemark("No consumer offset in this broker"); return response; @@ -745,7 +745,7 @@ public class AdminBrokerProcessor implements NettyRequestProcessor { return response; } } else { - log.error("No delay offset in this broker, client: " + ctx.channel().remoteAddress()); + log.error("No delay offset in this broker, client: {} ", ctx.channel().remoteAddress()); response.setCode(ResponseCode.SYSTEM_ERROR); response.setRemark("No delay offset in this broker"); return response; http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/167cce03/broker/src/main/java/org/apache/rocketmq/broker/processor/PullMessageProcessor.java ---------------------------------------------------------------------- diff --git a/broker/src/main/java/org/apache/rocketmq/broker/processor/PullMessageProcessor.java b/broker/src/main/java/org/apache/rocketmq/broker/processor/PullMessageProcessor.java index be1199a..382030b 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/processor/PullMessageProcessor.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/processor/PullMessageProcessor.java @@ -88,12 +88,12 @@ public class PullMessageProcessor implements NettyRequestProcessor { response.setOpaque(request.getOpaque()); if (LOG.isDebugEnabled()) { - LOG.debug("receive PullMessage request command, " + request); + LOG.debug("receive PullMessage request command, {}", request); } if (!PermName.isReadable(this.brokerController.getBrokerConfig().getBrokerPermission())) { response.setCode(ResponseCode.NO_PERMISSION); - response.setRemark("the broker[" + this.brokerController.getBrokerConfig().getBrokerIP1() + "] pulling message is forbidden"); + response.setRemark(String.format("the broker[%s] pulling message is forbidden", this.brokerController.getBrokerConfig().getBrokerIP1())); return response; } @@ -101,8 +101,7 @@ public class PullMessageProcessor implements NettyRequestProcessor { this.brokerController.getSubscriptionGroupManager().findSubscriptionGroupConfig(requestHeader.getConsumerGroup()); if (null == subscriptionGroupConfig) { response.setCode(ResponseCode.SUBSCRIPTION_GROUP_NOT_EXIST); - response.setRemark("subscription group not exist, " + requestHeader.getConsumerGroup() + " " - + FAQUrl.suggestTodo(FAQUrl.SUBSCRIPTION_GROUP_NOT_EXIST)); + response.setRemark(String.format("subscription group [%s] does not exist, %s", requestHeader.getConsumerGroup(), FAQUrl.suggestTodo(FAQUrl.SUBSCRIPTION_GROUP_NOT_EXIST))); return response; } @@ -120,10 +119,9 @@ public class PullMessageProcessor implements NettyRequestProcessor { TopicConfig topicConfig = this.brokerController.getTopicConfigManager().selectTopicConfig(requestHeader.getTopic()); if (null == topicConfig) { - LOG.error("the topic " + requestHeader.getTopic() + " not exist, consumer: " + RemotingHelper.parseChannelRemoteAddr(channel)); + LOG.error("The topic {} not exist, consumer: {} ", requestHeader.getTopic(), RemotingHelper.parseChannelRemoteAddr(channel)); response.setCode(ResponseCode.TOPIC_NOT_EXIST); - response.setRemark( - "topic[" + requestHeader.getTopic() + "] not exist, apply first please!" + FAQUrl.suggestTodo(FAQUrl.APPLY_TOPIC_URL)); + response.setRemark(String.format("topic[%s] not exist, apply first please! %s", requestHeader.getTopic(), FAQUrl.suggestTodo(FAQUrl.APPLY_TOPIC_URL))); return response; } @@ -134,8 +132,8 @@ public class PullMessageProcessor implements NettyRequestProcessor { } if (requestHeader.getQueueId() < 0 || requestHeader.getQueueId() >= topicConfig.getReadQueueNums()) { - String errorInfo = "queueId[" + requestHeader.getQueueId() + "] is illagal,Topic :" + requestHeader.getTopic() - + " topicConfig.readQueueNums: " + topicConfig.getReadQueueNums() + " consumer: " + channel.remoteAddress(); + String errorInfo = String.format("queueId[%d] is illagal, topic:[%s] topicConfig.readQueueNums:[%d] consumer:[%s]", + requestHeader.getQueueId(), requestHeader.getTopic(), topicConfig.getReadQueueNums(), channel.remoteAddress()); LOG.warn(errorInfo); response.setCode(ResponseCode.SYSTEM_ERROR); response.setRemark(errorInfo); @@ -148,8 +146,8 @@ public class PullMessageProcessor implements NettyRequestProcessor { subscriptionData = FilterAPI.buildSubscriptionData(requestHeader.getConsumerGroup(), requestHeader.getTopic(), requestHeader.getSubscription()); } catch (Exception e) { - LOG.warn("parse the consumer's subscription[{}] failed, group: {}", requestHeader.getSubscription(), // - requestHeader.getConsumerGroup()); + LOG.warn("Parse the consumer's subscription[{}] failed, group: {}", requestHeader.getSubscription(), // + requestHeader.getConsumerGroup()); response.setCode(ResponseCode.SUBSCRIPTION_PARSE_FAILED); response.setRemark("parse the consumer's subscription failed"); return response; @@ -158,7 +156,7 @@ public class PullMessageProcessor implements NettyRequestProcessor { ConsumerGroupInfo consumerGroupInfo = this.brokerController.getConsumerManager().getConsumerGroupInfo(requestHeader.getConsumerGroup()); if (null == consumerGroupInfo) { - LOG.warn("the consumer's group info not exist, group: {}", requestHeader.getConsumerGroup()); + LOG.warn("The consumer's group info not exist, group: {}", requestHeader.getConsumerGroup()); response.setCode(ResponseCode.SUBSCRIPTION_NOT_EXIST); response.setRemark("the consumer's group info not exist" + FAQUrl.suggestTodo(FAQUrl.SAME_GROUP_DIFFERENT_TOPIC)); return response; @@ -173,15 +171,15 @@ public class PullMessageProcessor implements NettyRequestProcessor { subscriptionData = consumerGroupInfo.findSubscriptionData(requestHeader.getTopic()); if (null == subscriptionData) { - LOG.warn("the consumer's subscription not exist, group: {}, topic:{}", requestHeader.getConsumerGroup(), requestHeader.getTopic()); + LOG.warn("The consumer's subscription not exist, group: {}, topic:{}", requestHeader.getConsumerGroup(), requestHeader.getTopic()); response.setCode(ResponseCode.SUBSCRIPTION_NOT_EXIST); response.setRemark("the consumer's subscription not exist" + FAQUrl.suggestTodo(FAQUrl.SAME_GROUP_DIFFERENT_TOPIC)); return response; } if (subscriptionData.getSubVersion() < requestHeader.getSubVersion()) { - LOG.warn("the broker's subscription is not latest, group: {} {}", requestHeader.getConsumerGroup(), - subscriptionData.getSubString()); + LOG.warn("The broker's subscription is not latest, group: {} {}", requestHeader.getConsumerGroup(), + subscriptionData.getSubString()); response.setCode(ResponseCode.SUBSCRIPTION_NOT_LATEST); response.setRemark("the consumer's subscription not latest"); return response; @@ -261,15 +259,14 @@ public class PullMessageProcessor implements NettyRequestProcessor { case OFFSET_OVERFLOW_BADLY: response.setCode(ResponseCode.PULL_OFFSET_MOVED); // XXX: warn and notify me - LOG.info("the request offset: " + requestHeader.getQueueOffset() + " over flow badly, broker max offset: " - + getMessageResult.getMaxOffset() + ", consumer: " + channel.remoteAddress()); + LOG.info("The request offset:{} over flow badly, broker max offset:{} , consumer: {}", requestHeader.getQueueOffset(), getMessageResult.getMaxOffset(), channel.remoteAddress()); break; case OFFSET_OVERFLOW_ONE: response.setCode(ResponseCode.PULL_NOT_FOUND); break; case OFFSET_TOO_SMALL: response.setCode(ResponseCode.PULL_OFFSET_MOVED); - LOG.info("the request offset too small. group={}, topic={}, requestOffset={}, brokerMinOffset={}, clientIp={}", + LOG.info("The request offset is too small. group={}, topic={}, requestOffset={}, brokerMinOffset={}, clientIp={}", requestHeader.getConsumerGroup(), requestHeader.getTopic(), requestHeader.getQueueOffset(), getMessageResult.getMinOffset(), channel.remoteAddress()); break; @@ -346,12 +343,12 @@ public class PullMessageProcessor implements NettyRequestProcessor { public void operationComplete(ChannelFuture future) throws Exception { getMessageResult.release(); if (!future.isSuccess()) { - LOG.error("transfer many message by pagecache failed, " + channel.remoteAddress(), future.cause()); + LOG.error("Fail to transfer messages from page cache to {}", channel.remoteAddress(), future.cause()); } } }); } catch (Throwable e) { - LOG.error("transfer many message by pagecache exception", e); + LOG.error("Error occurred when transferring messages from page cache", e); getMessageResult.release(); } @@ -480,7 +477,7 @@ public class PullMessageProcessor implements NettyRequestProcessor { PutMessageResult putMessageResult = this.brokerController.getMessageStore().putMessage(msgInner); } catch (Exception e) { - LOG.warn(String.format("generateOffsetMovedEvent Exception, %s", event.toString()), e); + LOG.warn(String.format("GenerateOffsetMovedEvent Exception, %s", event.toString()), e); } } @@ -499,21 +496,20 @@ public class PullMessageProcessor implements NettyRequestProcessor { @Override public void operationComplete(ChannelFuture future) throws Exception { if (!future.isSuccess()) { - LOG.error("processRequestWrapper response to " + future.channel().remoteAddress() + " failed", - future.cause()); + LOG.error("ProcessRequestWrapper response to {} failed", future.channel().remoteAddress(), future.cause()); LOG.error(request.toString()); LOG.error(response.toString()); } } }); } catch (Throwable e) { - LOG.error("processRequestWrapper process request over, but response failed", e); + LOG.error("ProcessRequestWrapper process request over, but response failed", e); LOG.error(request.toString()); LOG.error(response.toString()); } } } catch (RemotingCommandException e1) { - LOG.error("executeRequestWhenWakeup run", e1); + LOG.error("ExecuteRequestWhenWakeup run", e1); } } }; http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/167cce03/broker/src/main/java/org/apache/rocketmq/broker/processor/QueryMessageProcessor.java ---------------------------------------------------------------------- diff --git a/broker/src/main/java/org/apache/rocketmq/broker/processor/QueryMessageProcessor.java b/broker/src/main/java/org/apache/rocketmq/broker/processor/QueryMessageProcessor.java index 6a20774..e8f97d0 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/processor/QueryMessageProcessor.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/processor/QueryMessageProcessor.java @@ -146,7 +146,7 @@ public class QueryMessageProcessor implements NettyRequestProcessor { public void operationComplete(ChannelFuture future) throws Exception { selectMappedBufferResult.release(); if (!future.isSuccess()) { - log.error("transfer one message by page cache failed, ", future.cause()); + log.error("Transfer one message from page cache failed, ", future.cause()); } } }); http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/167cce03/broker/src/main/java/org/apache/rocketmq/broker/processor/SendMessageProcessor.java ---------------------------------------------------------------------- diff --git a/broker/src/main/java/org/apache/rocketmq/broker/processor/SendMessageProcessor.java b/broker/src/main/java/org/apache/rocketmq/broker/processor/SendMessageProcessor.java index b1d24db..a440462 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/processor/SendMessageProcessor.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/processor/SendMessageProcessor.java @@ -252,7 +252,7 @@ public class SendMessageProcessor extends AbstractSendMessageProcessor implement response.addExtField(MessageConst.PROPERTY_TRACE_SWITCH, String.valueOf(this.brokerController.getBrokerConfig().isTraceOn())); if (log.isDebugEnabled()) { - log.debug("receive SendMessage request command, " + request); + log.debug("receive SendMessage request command, {}", request); } final long startTimstamp = this.brokerController.getBrokerConfig().getStartAcceptSendRequestTimeStamp(); http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/167cce03/broker/src/main/java/org/apache/rocketmq/broker/slave/SlaveSynchronize.java ---------------------------------------------------------------------- diff --git a/broker/src/main/java/org/apache/rocketmq/broker/slave/SlaveSynchronize.java b/broker/src/main/java/org/apache/rocketmq/broker/slave/SlaveSynchronize.java index ecf8424..44c8264 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/slave/SlaveSynchronize.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/slave/SlaveSynchronize.java @@ -68,10 +68,10 @@ public class SlaveSynchronize { .putAll(topicWrapper.getTopicConfigTable()); this.brokerController.getTopicConfigManager().persist(); - log.info("update slave topic config from master, {}", masterAddrBak); + log.info("Update slave topic config from master, {}", masterAddrBak); } } catch (Exception e) { - log.error("syncTopicConfig Exception, " + masterAddrBak, e); + log.error("SyncTopicConfig Exception, {}", masterAddrBak, e); } } } @@ -85,9 +85,9 @@ public class SlaveSynchronize { this.brokerController.getConsumerOffsetManager().getOffsetTable() .putAll(offsetWrapper.getOffsetTable()); this.brokerController.getConsumerOffsetManager().persist(); - log.info("update slave consumer offset from master, {}", masterAddrBak); + log.info("Update slave consumer offset from master, {}", masterAddrBak); } catch (Exception e) { - log.error("syncConsumerOffset Exception, " + masterAddrBak, e); + log.error("SyncConsumerOffset Exception, {}", masterAddrBak, e); } } } @@ -106,12 +106,12 @@ public class SlaveSynchronize { try { MixAll.string2File(delayOffset, fileName); } catch (IOException e) { - log.error("persist file Exception, " + fileName, e); + log.error("Persist file Exception, {}", fileName, e); } } - log.info("update slave delay offset from master, {}", masterAddrBak); + log.info("Update slave delay offset from master, {}", masterAddrBak); } catch (Exception e) { - log.error("syncDelayOffset Exception, " + masterAddrBak, e); + log.error("SyncDelayOffset Exception, {}", masterAddrBak, e); } } } @@ -134,10 +134,10 @@ public class SlaveSynchronize { subscriptionGroupManager.getSubscriptionGroupTable().putAll( subscriptionWrapper.getSubscriptionGroupTable()); subscriptionGroupManager.persist(); - log.info("update slave Subscription Group from master, {}", masterAddrBak); + log.info("Update slave Subscription Group from master, {}", masterAddrBak); } } catch (Exception e) { - log.error("syncSubscriptionGroup Exception, " + masterAddrBak, e); + log.error("SyncSubscriptionGroup Exception, {}", masterAddrBak, e); } } } http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/167cce03/broker/src/main/java/org/apache/rocketmq/broker/subscription/SubscriptionGroupManager.java ---------------------------------------------------------------------- diff --git a/broker/src/main/java/org/apache/rocketmq/broker/subscription/SubscriptionGroupManager.java b/broker/src/main/java/org/apache/rocketmq/broker/subscription/SubscriptionGroupManager.java index b661339..4b6072c 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/subscription/SubscriptionGroupManager.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/subscription/SubscriptionGroupManager.java @@ -99,9 +99,9 @@ public class SubscriptionGroupManager extends ConfigManager { public void updateSubscriptionGroupConfig(final SubscriptionGroupConfig config) { SubscriptionGroupConfig old = this.subscriptionGroupTable.put(config.getGroupName(), config); if (old != null) { - log.info("update subscription group config, old: " + old + " new: " + config); + log.info("update subscription group config, old: {} new: {}", old, config); } else { - log.info("create new subscription group, " + config); + log.info("create new subscription group, {}", config); } this.dataVersion.nextVersion(); @@ -181,11 +181,11 @@ public class SubscriptionGroupManager extends ConfigManager { public void deleteSubscriptionGroupConfig(final String groupName) { SubscriptionGroupConfig old = this.subscriptionGroupTable.remove(groupName); if (old != null) { - log.info("delete subscription group OK, subscription group: " + old); + log.info("delete subscription group OK, subscription group:{}", old); this.dataVersion.nextVersion(); this.persist(); } else { - log.warn("delete subscription group failed, subscription group: " + old + " not exist"); + log.warn("delete subscription group failed, subscription group: {} not exist", old); } } } http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/167cce03/broker/src/main/java/org/apache/rocketmq/broker/topic/TopicConfigManager.java ---------------------------------------------------------------------- diff --git a/broker/src/main/java/org/apache/rocketmq/broker/topic/TopicConfigManager.java b/broker/src/main/java/org/apache/rocketmq/broker/topic/TopicConfigManager.java index df2231d..d31ad4b 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/topic/TopicConfigManager.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/topic/TopicConfigManager.java @@ -181,18 +181,15 @@ public class TopicConfigManager extends ConfigManager { topicConfig.setTopicSysFlag(topicSysFlag); topicConfig.setTopicFilterType(defaultTopicConfig.getTopicFilterType()); } else { - LOG.warn("create new topic failed, because the default topic[" + defaultTopic - + "] no perm, " + defaultTopicConfig.getPerm() + " producer: " - + remoteAddress); + LOG.warn("Create new topic failed, because the default topic[{}] has no perm [{}] producer:[{}]", + defaultTopic, defaultTopicConfig.getPerm(), remoteAddress); } } else { - LOG.warn("create new topic failed, because the default topic[" + defaultTopic - + "] not exist." + " producer: " + remoteAddress); + LOG.warn("Create new topic failed, because the default topic[{}] not exist. producer:[{}]", defaultTopic, remoteAddress); } if (topicConfig != null) { - LOG.info("create new topic by default topic[" + defaultTopic + "], " + topicConfig - + " producer: " + remoteAddress); + LOG.info("Create new topic by default topic:[{}] config:[{}] producer:[{}]", defaultTopic, topicConfig, remoteAddress); this.topicConfigTable.put(topic, topicConfig); @@ -307,9 +304,9 @@ public class TopicConfigManager extends ConfigManager { public void updateTopicConfig(final TopicConfig topicConfig) { TopicConfig old = this.topicConfigTable.put(topicConfig.getTopicName(), topicConfig); if (old != null) { - LOG.info("update topic config, old: " + old + " new: " + topicConfig); + LOG.info("update topic config, old:[{}] new:[{}]", old, topicConfig); } else { - LOG.info("create new topic, " + topicConfig); + LOG.info("create new topic [{}]", topicConfig); } this.dataVersion.nextVersion(); @@ -362,11 +359,11 @@ public class TopicConfigManager extends ConfigManager { public void deleteTopicConfig(final String topic) { TopicConfig old = this.topicConfigTable.remove(topic); if (old != null) { - LOG.info("delete topic config OK, topic: " + old); + LOG.info("Delete topic config OK, topic:{}", old); this.dataVersion.nextVersion(); this.persist(); } else { - LOG.warn("delete topic config failed, topic: " + topic + " not exist"); + LOG.warn("Delete topic config failed, topic:{} not exist", topic); } } http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/167cce03/client/src/main/java/org/apache/rocketmq/client/impl/ClientRemotingProcessor.java ---------------------------------------------------------------------- diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/ClientRemotingProcessor.java b/client/src/main/java/org/apache/rocketmq/client/impl/ClientRemotingProcessor.java index ebf0734..2aadc89 100644 --- a/client/src/main/java/org/apache/rocketmq/client/impl/ClientRemotingProcessor.java +++ b/client/src/main/java/org/apache/rocketmq/client/impl/ClientRemotingProcessor.java @@ -125,9 +125,8 @@ public class ClientRemotingProcessor implements NettyRequestProcessor { final ResetOffsetRequestHeader requestHeader = (ResetOffsetRequestHeader) request.decodeCommandCustomHeader(ResetOffsetRequestHeader.class); log.info("invoke reset offset operation from broker. brokerAddr={}, topic={}, group={}, timestamp={}", - new Object[] { RemotingHelper.parseChannelRemoteAddr(ctx.channel()), requestHeader.getTopic(), requestHeader.getGroup(), - requestHeader.getTimestamp()}); + requestHeader.getTimestamp()); Map<MessageQueue, Long> offsetTable = new HashMap<MessageQueue, Long>(); if (request.getBody() != null) { ResetOffsetBody body = ResetOffsetBody.decode(request.getBody(), ResetOffsetBody.class); http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/167cce03/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java ---------------------------------------------------------------------- diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java b/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java index 3bacd5d..12580c1 100644 --- a/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java +++ b/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java @@ -212,13 +212,11 @@ public class MQClientAPIImpl { public void updateNameServerAddressList(final String addrs) { List<String> lst = new ArrayList<String>(); String[] addrArray = addrs.split(";"); - if (addrArray != null) { - for (String addr : addrArray) { - lst.add(addr); - } - - this.remotingClient.updateNameServerAddressList(lst); + for (String addr : addrArray) { + lst.add(addr); } + + this.remotingClient.updateNameServerAddressList(lst); } public void start() { @@ -468,7 +466,7 @@ public class MQClientAPIImpl { } try { sendCallback.onException(e); - } catch (Exception e2) { + } catch (Exception ignored) { } } } @@ -1074,8 +1072,7 @@ public class MQClientAPIImpl { request, timeoutMillis); switch (response.getCode()) { case ResponseCode.SUCCESS: { - ConsumerConnection consumerConnection = ConsumerConnection.decode(response.getBody(), ConsumerConnection.class); - return consumerConnection; + return ConsumerConnection.decode(response.getBody(), ConsumerConnection.class); } default: break; @@ -1151,8 +1148,7 @@ public class MQClientAPIImpl { assert response != null; switch (response.getCode()) { case ResponseCode.SUCCESS: { - ClusterInfo responseBody = ClusterInfo.decode(response.getBody(), ClusterInfo.class); - return responseBody; + return ClusterInfo.decode(response.getBody(), ClusterInfo.class); } default: break; @@ -1226,8 +1222,7 @@ public class MQClientAPIImpl { case ResponseCode.SUCCESS: { byte[] body = response.getBody(); if (body != null) { - TopicList topicList = TopicList.decode(body, TopicList.class); - return topicList; + return TopicList.decode(body, TopicList.class); } } default: http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/167cce03/client/src/main/java/org/apache/rocketmq/client/impl/MQClientManager.java ---------------------------------------------------------------------- diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/MQClientManager.java b/client/src/main/java/org/apache/rocketmq/client/impl/MQClientManager.java index ee55d92..6f2c9a3 100644 --- a/client/src/main/java/org/apache/rocketmq/client/impl/MQClientManager.java +++ b/client/src/main/java/org/apache/rocketmq/client/impl/MQClientManager.java @@ -20,9 +20,12 @@ import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.atomic.AtomicInteger; import org.apache.rocketmq.client.ClientConfig; import org.apache.rocketmq.client.impl.factory.MQClientInstance; +import org.apache.rocketmq.client.log.ClientLogger; import org.apache.rocketmq.remoting.RPCHook; +import org.slf4j.Logger; public class MQClientManager { + private final static Logger log = ClientLogger.getLog(); private static MQClientManager instance = new MQClientManager(); private AtomicInteger factoryIndexGenerator = new AtomicInteger(); private ConcurrentHashMap<String/* clientId */, MQClientInstance> factoryTable = @@ -51,7 +54,7 @@ public class MQClientManager { if (prev != null) { instance = prev; } else { - // TODO log + log.warn("Previous MQClientInstance has created for clientId:[{}]", clientId); } } http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/167cce03/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPullConsumerImpl.java ---------------------------------------------------------------------- diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPullConsumerImpl.java b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPullConsumerImpl.java index bbc705e..b26d062 100644 --- a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPullConsumerImpl.java +++ b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPullConsumerImpl.java @@ -227,7 +227,7 @@ public class DefaultMQPullConsumerImpl implements MQConsumerInner { SubscriptionData subscriptionData = FilterAPI.buildSubscriptionData(this.defaultMQPullConsumer.getConsumerGroup(), // topic, SubscriptionData.SUB_ALL); this.rebalanceImpl.subscriptionInner.putIfAbsent(topic, subscriptionData); - } catch (Exception e) { + } catch (Exception ignore) { } } } @@ -246,7 +246,7 @@ public class DefaultMQPullConsumerImpl implements MQConsumerInner { for (ConsumeMessageHook hook : this.consumeMessageHookList) { try { hook.consumeMessageBefore(context); - } catch (Throwable e) { + } catch (Throwable ignored) { } } } @@ -257,7 +257,7 @@ public class DefaultMQPullConsumerImpl implements MQConsumerInner { for (ConsumeMessageHook hook : this.consumeMessageHookList) { try { hook.consumeMessageAfter(context); - } catch (Throwable e) { + } catch (Throwable ignored) { } } } @@ -314,9 +314,7 @@ public class DefaultMQPullConsumerImpl implements MQConsumerInner { this.makeSureStateOK(); Set<MessageQueue> mqs = new HashSet<MessageQueue>(); Set<MessageQueue> allocateMq = this.rebalanceImpl.getProcessQueueTable().keySet(); - if (allocateMq != null) { - mqs.addAll(allocateMq); - } + mqs.addAll(allocateMq); this.offsetStore.persistAll(mqs); } catch (Exception e) { log.error("group: " + this.defaultMQPullConsumer.getConsumerGroup() + " persistConsumerOffset exception", e); http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/167cce03/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPushConsumerImpl.java ---------------------------------------------------------------------- diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPushConsumerImpl.java b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPushConsumerImpl.java index 1a39998..4f33732 100644 --- a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPushConsumerImpl.java +++ b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPushConsumerImpl.java @@ -921,7 +921,7 @@ public class DefaultMQPushConsumerImpl implements MQConsumerInner { @Override public void doRebalance() { - if (this.rebalanceImpl != null && !this.pause) { + if (!this.pause) { this.rebalanceImpl.doRebalance(this.isConsumeOrderly()); } } @@ -932,9 +932,7 @@ public class DefaultMQPushConsumerImpl implements MQConsumerInner { this.makeSureStateOK(); Set<MessageQueue> mqs = new HashSet<MessageQueue>(); Set<MessageQueue> allocateMq = this.rebalanceImpl.getProcessQueueTable().keySet(); - if (allocateMq != null) { - mqs.addAll(allocateMq); - } + mqs.addAll(allocateMq); this.offsetStore.persistAll(mqs); } catch (Exception e) { http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/167cce03/client/src/main/java/org/apache/rocketmq/client/impl/consumer/ProcessQueue.java ---------------------------------------------------------------------- diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/ProcessQueue.java b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/ProcessQueue.java index 93d4cf9..38b8073 100644 --- a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/ProcessQueue.java +++ b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/ProcessQueue.java @@ -58,13 +58,11 @@ public class ProcessQueue { private volatile long msgAccCnt = 0; public boolean isLockExpired() { - boolean result = (System.currentTimeMillis() - this.lastLockTimestamp) > REBALANCE_LOCK_MAX_LIVE_TIME; - return result; + return (System.currentTimeMillis() - this.lastLockTimestamp) > REBALANCE_LOCK_MAX_LIVE_TIME; } public boolean isPullExpired() { - boolean result = (System.currentTimeMillis() - this.lastPullTimestamp) > PULL_MAX_IDLE_TIME; - return result; + return (System.currentTimeMillis() - this.lastPullTimestamp) > PULL_MAX_IDLE_TIME; } /** http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/167cce03/client/src/main/java/org/apache/rocketmq/client/impl/factory/MQClientInstance.java ---------------------------------------------------------------------- diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/factory/MQClientInstance.java b/client/src/main/java/org/apache/rocketmq/client/impl/factory/MQClientInstance.java index c1944d8..1343e76 100644 --- a/client/src/main/java/org/apache/rocketmq/client/impl/factory/MQClientInstance.java +++ b/client/src/main/java/org/apache/rocketmq/client/impl/factory/MQClientInstance.java @@ -472,7 +472,7 @@ public class MQClientInstance { final boolean producerEmpty = heartbeatData.getProducerDataSet().isEmpty(); final boolean consumerEmpty = heartbeatData.getConsumerDataSet().isEmpty(); if (producerEmpty && consumerEmpty) { - log.warn("sending hearbeat, but no consumer and no producer"); + log.warn("sending heartbeat, but no consumer and no producer"); return; } @@ -841,13 +841,8 @@ public class MQClientInstance { if (addr != null) { try { this.mQClientAPIImpl.unregisterClient(addr, this.clientId, producerGroup, consumerGroup, 3000); - log.info("unregister client[Producer: {} Consumer: {}] from broker[{} {} {}] success", producerGroup, - consumerGroup, brokerName, entry1.getKey(), addr); - } catch (RemotingException e) { - log.error("unregister client exception from broker: " + addr, e); - } catch (MQBrokerException e) { - log.error("unregister client exception from broker: " + addr, e); - } catch (InterruptedException e) { + log.info("unregister client[Producer: {} Consumer: {}] from broker[{} {} {}] success", producerGroup, consumerGroup, brokerName, entry1.getKey(), addr); + } catch (RemotingException | InterruptedException | MQBrokerException e) { log.error("unregister client exception from broker: " + addr, e); } } @@ -1064,7 +1059,9 @@ public class MQClientInstance { } } } finally { - consumer.resume(); + if (consumer != null) { + consumer.resume(); + } } } @@ -1134,14 +1131,14 @@ public class MQClientInstance { List<String> nsList = this.mQClientAPIImpl.getRemotingClient().getNameServerAddressList(); - StringBuffer strBuffer = new StringBuffer(); + StringBuilder strBuilder = new StringBuilder(); if (nsList != null) { for (String addr : nsList) { - strBuffer.append(addr + ";"); + strBuilder.append(addr).append(";"); } } - String nsAddr = strBuffer.toString(); + String nsAddr = strBuilder.toString(); consumerRunningInfo.getProperties().put(ConsumerRunningInfo.PROP_NAMESERVER_ADDR, nsAddr); consumerRunningInfo.getProperties().put(ConsumerRunningInfo.PROP_CONSUME_TYPE, mqConsumerInner.consumeType().name()); consumerRunningInfo.getProperties().put(ConsumerRunningInfo.PROP_CLIENT_VERSION, http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/167cce03/client/src/main/java/org/apache/rocketmq/client/latency/MQFaultStrategy.java ---------------------------------------------------------------------- diff --git a/client/src/main/java/org/apache/rocketmq/client/latency/MQFaultStrategy.java b/client/src/main/java/org/apache/rocketmq/client/latency/MQFaultStrategy.java index 206b0a3..235aa20 100644 --- a/client/src/main/java/org/apache/rocketmq/client/latency/MQFaultStrategy.java +++ b/client/src/main/java/org/apache/rocketmq/client/latency/MQFaultStrategy.java @@ -18,9 +18,12 @@ package org.apache.rocketmq.client.latency; import org.apache.rocketmq.client.impl.producer.TopicPublishInfo; +import org.apache.rocketmq.client.log.ClientLogger; import org.apache.rocketmq.common.message.MessageQueue; +import org.slf4j.Logger; public class MQFaultStrategy { + private final static Logger log = ClientLogger.getLog(); private final LatencyFaultTolerance<String> latencyFaultTolerance = new LatencyFaultToleranceImpl(); private boolean sendLatencyFaultEnable = false; @@ -80,6 +83,7 @@ public class MQFaultStrategy { latencyFaultTolerance.remove(notBestBroker); } } catch (Exception e) { + log.error("Error occurred when selecting message queue", e); } return tpInfo.selectOneMessageQueue(); http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/167cce03/common/src/main/java/org/apache/rocketmq/common/MixAll.java ---------------------------------------------------------------------- diff --git a/common/src/main/java/org/apache/rocketmq/common/MixAll.java b/common/src/main/java/org/apache/rocketmq/common/MixAll.java index e9e19b6..22ed96a 100644 --- a/common/src/main/java/org/apache/rocketmq/common/MixAll.java +++ b/common/src/main/java/org/apache/rocketmq/common/MixAll.java @@ -137,7 +137,7 @@ public class MixAll { return Math.abs(value); } - public static final void string2File(final String str, final String fileName) throws IOException { + public static void string2File(final String str, final String fileName) throws IOException { String tmpFile = fileName + ".tmp"; string2FileNotSafe(str, tmpFile); @@ -155,7 +155,8 @@ public class MixAll { file.renameTo(new File(fileName)); } - public static final void string2FileNotSafe(final String str, final String fileName) throws IOException { + + public static void string2FileNotSafe(final String str, final String fileName) throws IOException { File file = new File(fileName); File fileParent = file.getParentFile(); if (fileParent != null) { @@ -170,21 +171,17 @@ public class MixAll { throw e; } finally { if (fileWriter != null) { - try { - fileWriter.close(); - } catch (IOException e) { - throw e; - } + fileWriter.close(); } } } - public static final String file2String(final String fileName) { + public static String file2String(final String fileName) { File file = new File(fileName); return file2String(file); } - public static final String file2String(final File file) { + public static String file2String(final File file) { if (file.exists()) { char[] data = new char[(int) file.length()]; boolean result = false; @@ -213,7 +210,7 @@ public class MixAll { return null; } - public static final String file2String(final URL url) { + public static String file2String(final URL url) { InputStream in = null; try { URLConnection urlConnection = url.openConnection(); @@ -223,12 +220,12 @@ public class MixAll { byte[] data = new byte[len]; in.read(data, 0, len); return new String(data, "UTF-8"); - } catch (Exception e) { + } catch (Exception ignored) { } finally { if (null != in) { try { in.close(); - } catch (IOException e) { + } catch (IOException ignored) { } } } @@ -258,9 +255,7 @@ public class MixAll { if (null == value) { value = ""; } - } catch (IllegalArgumentException e) { - e.printStackTrace(); - } catch (IllegalAccessException e) { + } catch (IllegalArgumentException | IllegalAccessException e) { e.printStackTrace(); } @@ -273,7 +268,6 @@ public class MixAll { if (log != null) { log.info(name + "=" + value); - } else { } } } @@ -318,9 +312,7 @@ public class MixAll { try { field.setAccessible(true); value = field.get(object); - } catch (IllegalArgumentException e) { - e.printStackTrace(); - } catch (IllegalAccessException e) { + } catch (IllegalArgumentException | IllegalAccessException e) { e.printStackTrace(); } @@ -365,10 +357,10 @@ public class MixAll { } else { continue; } - method.invoke(object, new Object[] {arg}); + method.invoke(object, arg); } } - } catch (Throwable e) { + } catch (Throwable ignored) { } } } http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/167cce03/common/src/main/java/org/apache/rocketmq/common/UtilAll.java ---------------------------------------------------------------------- diff --git a/common/src/main/java/org/apache/rocketmq/common/UtilAll.java b/common/src/main/java/org/apache/rocketmq/common/UtilAll.java index 5a81b1b..56015b3 100644 --- a/common/src/main/java/org/apache/rocketmq/common/UtilAll.java +++ b/common/src/main/java/org/apache/rocketmq/common/UtilAll.java @@ -79,7 +79,7 @@ public class UtilAll { public static boolean isItTimeToDo(final String when) { String[] whiles = when.split(";"); - if (whiles != null && whiles.length > 0) { + if (whiles.length > 0) { Calendar now = Calendar.getInstance(); for (String w : whiles) { int nowHour = Integer.parseInt(w); @@ -186,6 +186,7 @@ public class UtilAll { if (!file.exists()) { boolean result = file.mkdirs(); if (!result) { + //TO DO } } @@ -202,7 +203,8 @@ public class UtilAll { return -1; } - public static final int crc32(byte[] array) { + + public static int crc32(byte[] array) { if (array != null) { return crc32(array, 0, array.length); } @@ -210,7 +212,8 @@ public class UtilAll { return 0; } - public static final int crc32(byte[] array, int offset, int length) { + + public static int crc32(byte[] array, int offset, int length) { CRC32 crc32 = new CRC32(); crc32.update(array, offset, length); return (int) (crc32.getValue() & 0x7FFFFFFF); @@ -267,15 +270,15 @@ public class UtilAll { } finally { try { byteArrayInputStream.close(); - } catch (IOException e) { + } catch (IOException ignored) { } try { inflaterInputStream.close(); - } catch (IOException e) { + } catch (IOException ignored) { } try { byteArrayOutputStream.close(); - } catch (IOException e) { + } catch (IOException ignored) { } } http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/167cce03/common/src/main/java/org/apache/rocketmq/common/filter/FilterAPI.java ---------------------------------------------------------------------- diff --git a/common/src/main/java/org/apache/rocketmq/common/filter/FilterAPI.java b/common/src/main/java/org/apache/rocketmq/common/filter/FilterAPI.java index 2097bfa..e9bf3fa 100644 --- a/common/src/main/java/org/apache/rocketmq/common/filter/FilterAPI.java +++ b/common/src/main/java/org/apache/rocketmq/common/filter/FilterAPI.java @@ -46,7 +46,7 @@ public class FilterAPI { subscriptionData.setSubString(SubscriptionData.SUB_ALL); } else { String[] tags = subString.split("\\|\\|"); - if (tags != null && tags.length > 0) { + if (tags.length > 0) { for (String tag : tags) { if (tag.length() > 0) { String trimString = tag.trim(); http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/167cce03/common/src/main/java/org/apache/rocketmq/common/message/MessageDecoder.java ---------------------------------------------------------------------- diff --git a/common/src/main/java/org/apache/rocketmq/common/message/MessageDecoder.java b/common/src/main/java/org/apache/rocketmq/common/message/MessageDecoder.java index efa952e..4f4e158 100644 --- a/common/src/main/java/org/apache/rocketmq/common/message/MessageDecoder.java +++ b/common/src/main/java/org/apache/rocketmq/common/message/MessageDecoder.java @@ -16,19 +16,19 @@ */ package org.apache.rocketmq.common.message; +import org.apache.rocketmq.common.UtilAll; +import org.apache.rocketmq.common.sysflag.MessageSysFlag; + import java.net.InetAddress; import java.net.InetSocketAddress; import java.net.SocketAddress; import java.net.UnknownHostException; -import java.nio.BufferUnderflowException; import java.nio.ByteBuffer; import java.nio.charset.Charset; import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.Map; -import org.apache.rocketmq.common.UtilAll; -import org.apache.rocketmq.common.sysflag.MessageSysFlag; public class MessageDecoder { public final static int MSG_ID_LENGTH = 8 + 8; @@ -318,10 +318,6 @@ public class MessageDecoder { } return msgExt; - } catch (UnknownHostException e) { - byteBuffer.position(byteBuffer.limit()); - } catch (BufferUnderflowException e) { - byteBuffer.position(byteBuffer.limit()); } catch (Exception e) { byteBuffer.position(byteBuffer.limit()); } @@ -366,12 +362,10 @@ public class MessageDecoder { Map<String, String> map = new HashMap<String, String>(); if (properties != null) { String[] items = properties.split(String.valueOf(PROPERTY_SEPARATOR)); - if (items != null) { - for (String i : items) { - String[] nv = i.split(String.valueOf(NAME_VALUE_SEPARATOR)); - if (nv != null && 2 == nv.length) { - map.put(nv[0], nv[1]); - } + for (String i : items) { + String[] nv = i.split(String.valueOf(NAME_VALUE_SEPARATOR)); + if (2 == nv.length) { + map.put(nv[0], nv[1]); } } } http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/167cce03/common/src/main/java/org/apache/rocketmq/common/protocol/MQProtosHelper.java ---------------------------------------------------------------------- diff --git a/common/src/main/java/org/apache/rocketmq/common/protocol/MQProtosHelper.java b/common/src/main/java/org/apache/rocketmq/common/protocol/MQProtosHelper.java index 7b875d0..bff7333 100644 --- a/common/src/main/java/org/apache/rocketmq/common/protocol/MQProtosHelper.java +++ b/common/src/main/java/org/apache/rocketmq/common/protocol/MQProtosHelper.java @@ -19,9 +19,6 @@ package org.apache.rocketmq.common.protocol; import org.apache.rocketmq.common.protocol.header.namesrv.RegisterBrokerRequestHeader; import org.apache.rocketmq.remoting.common.RemotingHelper; -import org.apache.rocketmq.remoting.exception.RemotingConnectException; -import org.apache.rocketmq.remoting.exception.RemotingSendRequestException; -import org.apache.rocketmq.remoting.exception.RemotingTimeoutException; import org.apache.rocketmq.remoting.protocol.RemotingCommand; public class MQProtosHelper { @@ -38,13 +35,7 @@ public class MQProtosHelper { if (response != null) { return ResponseCode.SUCCESS == response.getCode(); } - } catch (RemotingConnectException e) { - e.printStackTrace(); - } catch (RemotingSendRequestException e) { - e.printStackTrace(); - } catch (RemotingTimeoutException e) { - e.printStackTrace(); - } catch (InterruptedException e) { + } catch (Exception e) { e.printStackTrace(); } http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/167cce03/common/src/main/java/org/apache/rocketmq/common/stats/MomentStatsItemSet.java ---------------------------------------------------------------------- diff --git a/common/src/main/java/org/apache/rocketmq/common/stats/MomentStatsItemSet.java b/common/src/main/java/org/apache/rocketmq/common/stats/MomentStatsItemSet.java index 22dac95..5498d34 100644 --- a/common/src/main/java/org/apache/rocketmq/common/stats/MomentStatsItemSet.java +++ b/common/src/main/java/org/apache/rocketmq/common/stats/MomentStatsItemSet.java @@ -54,7 +54,7 @@ public class MomentStatsItemSet { public void run() { try { printAtMinutes(); - } catch (Throwable e) { + } catch (Throwable ignored) { } } }, Math.abs(UtilAll.computNextMinutesTimeMillis() - System.currentTimeMillis()), 1000 * 60 * 5, TimeUnit.MILLISECONDS); http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/167cce03/common/src/main/java/org/apache/rocketmq/common/stats/StatsItem.java ---------------------------------------------------------------------- diff --git a/common/src/main/java/org/apache/rocketmq/common/stats/StatsItem.java b/common/src/main/java/org/apache/rocketmq/common/stats/StatsItem.java index fd9bc4c..9b37f80 100644 --- a/common/src/main/java/org/apache/rocketmq/common/stats/StatsItem.java +++ b/common/src/main/java/org/apache/rocketmq/common/stats/StatsItem.java @@ -94,7 +94,7 @@ public class StatsItem { public void run() { try { samplingInSeconds(); - } catch (Throwable e) { + } catch (Throwable ignored) { } } }, 0, 10, TimeUnit.SECONDS); @@ -104,7 +104,7 @@ public class StatsItem { public void run() { try { samplingInMinutes(); - } catch (Throwable e) { + } catch (Throwable ignored) { } } }, 0, 10, TimeUnit.MINUTES); @@ -114,7 +114,7 @@ public class StatsItem { public void run() { try { samplingInHour(); - } catch (Throwable e) { + } catch (Throwable ignored) { } } }, 0, 1, TimeUnit.HOURS); http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/167cce03/common/src/main/java/org/apache/rocketmq/common/stats/StatsItemSet.java ---------------------------------------------------------------------- diff --git a/common/src/main/java/org/apache/rocketmq/common/stats/StatsItemSet.java b/common/src/main/java/org/apache/rocketmq/common/stats/StatsItemSet.java index 33c65ae..8633d68 100644 --- a/common/src/main/java/org/apache/rocketmq/common/stats/StatsItemSet.java +++ b/common/src/main/java/org/apache/rocketmq/common/stats/StatsItemSet.java @@ -47,7 +47,7 @@ public class StatsItemSet { public void run() { try { samplingInSeconds(); - } catch (Throwable e) { + } catch (Throwable ignored) { } } }, 0, 10, TimeUnit.SECONDS); @@ -57,7 +57,7 @@ public class StatsItemSet { public void run() { try { samplingInMinutes(); - } catch (Throwable e) { + } catch (Throwable ignored) { } } }, 0, 10, TimeUnit.MINUTES); @@ -67,7 +67,7 @@ public class StatsItemSet { public void run() { try { samplingInHour(); - } catch (Throwable e) { + } catch (Throwable ignored) { } } }, 0, 1, TimeUnit.HOURS); @@ -77,7 +77,7 @@ public class StatsItemSet { public void run() { try { printAtMinutes(); - } catch (Throwable e) { + } catch (Throwable ignored) { } } }, Math.abs(UtilAll.computNextMinutesTimeMillis() - System.currentTimeMillis()), 1000 * 60, TimeUnit.MILLISECONDS); @@ -87,7 +87,7 @@ public class StatsItemSet { public void run() { try { printAtHour(); - } catch (Throwable e) { + } catch (Throwable ignored) { } } }, Math.abs(UtilAll.computNextHourTimeMillis() - System.currentTimeMillis()), 1000 * 60 * 60, TimeUnit.MILLISECONDS); @@ -97,7 +97,7 @@ public class StatsItemSet { public void run() { try { printAtDay(); - } catch (Throwable e) { + } catch (Throwable ignored) { } } }, Math.abs(UtilAll.computNextMorningTimeMillis() - System.currentTimeMillis()), 1000 * 60 * 60 * 24, TimeUnit.MILLISECONDS); http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/167cce03/common/src/test/java/org/apache/rocketmq/common/MixAllTest.java ---------------------------------------------------------------------- diff --git a/common/src/test/java/org/apache/rocketmq/common/MixAllTest.java b/common/src/test/java/org/apache/rocketmq/common/MixAllTest.java index 693718e..f5c4fad 100644 --- a/common/src/test/java/org/apache/rocketmq/common/MixAllTest.java +++ b/common/src/test/java/org/apache/rocketmq/common/MixAllTest.java @@ -17,10 +17,11 @@ package org.apache.rocketmq.common; +import org.junit.Test; +import org.junit.Assert; + import java.net.InetAddress; import java.util.List; -import junit.framework.Assert; -import org.junit.Test; public class MixAllTest { http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/167cce03/common/src/test/java/org/apache/rocketmq/common/RemotingUtilTest.java ---------------------------------------------------------------------- diff --git a/common/src/test/java/org/apache/rocketmq/common/RemotingUtilTest.java b/common/src/test/java/org/apache/rocketmq/common/RemotingUtilTest.java index e1e82d8..2c9a2fb 100644 --- a/common/src/test/java/org/apache/rocketmq/common/RemotingUtilTest.java +++ b/common/src/test/java/org/apache/rocketmq/common/RemotingUtilTest.java @@ -17,12 +17,13 @@ package org.apache.rocketmq.common; import org.apache.rocketmq.remoting.common.RemotingUtil; +import org.junit.Assert; import org.junit.Test; public class RemotingUtilTest { @Test public void test() throws Exception { String a = RemotingUtil.getLocalAddress(); - System.out.println(a); + Assert.assertTrue(a.length() > 0); } } http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/167cce03/common/src/test/java/org/apache/rocketmq/common/protocol/ConsumeStatusTest.java ---------------------------------------------------------------------- diff --git a/common/src/test/java/org/apache/rocketmq/common/protocol/ConsumeStatusTest.java b/common/src/test/java/org/apache/rocketmq/common/protocol/ConsumeStatusTest.java index d39a53a..e738ed6 100644 --- a/common/src/test/java/org/apache/rocketmq/common/protocol/ConsumeStatusTest.java +++ b/common/src/test/java/org/apache/rocketmq/common/protocol/ConsumeStatusTest.java @@ -19,17 +19,22 @@ package org.apache.rocketmq.common.protocol; import org.apache.rocketmq.common.protocol.body.ConsumeStatus; import org.apache.rocketmq.remoting.protocol.RemotingSerializable; +import org.junit.Assert; import org.junit.Test; public class ConsumeStatusTest { @Test - public void decode_test() throws Exception { + public void decodeTest() throws Exception { ConsumeStatus cs = new ConsumeStatus(); - cs.setConsumeFailedTPS(0L); + cs.setConsumeFailedTPS(10); + cs.setPullRT(100); + cs.setPullTPS(1000); String json = RemotingSerializable.toJson(cs, true); - System.out.println(json); ConsumeStatus fromJson = RemotingSerializable.fromJson(json, ConsumeStatus.class); + Assert.assertEquals(fromJson.getPullRT(), cs.getPullRT(), 0.0001); + Assert.assertEquals(fromJson.getPullTPS(), cs.getPullTPS(), 0.0001); + Assert.assertEquals(fromJson.getConsumeFailedTPS(), cs.getConsumeFailedTPS(), 0.0001); } } http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/167cce03/example/src/main/java/org/apache/rocketmq/example/benchmark/Producer.java ---------------------------------------------------------------------- diff --git a/example/src/main/java/org/apache/rocketmq/example/benchmark/Producer.java b/example/src/main/java/org/apache/rocketmq/example/benchmark/Producer.java index 65c9bf2..50d750d 100644 --- a/example/src/main/java/org/apache/rocketmq/example/benchmark/Producer.java +++ b/example/src/main/java/org/apache/rocketmq/example/benchmark/Producer.java @@ -49,7 +49,7 @@ public class Producer { final String topic = commandLine.hasOption('t') ? commandLine.getOptionValue('t').trim() : "BenchmarkTest"; final int threadCount = commandLine.hasOption('w') ? Integer.parseInt(commandLine.getOptionValue('w')) : 64; final int messageSize = commandLine.hasOption('s') ? Integer.parseInt(commandLine.getOptionValue('s')) : 128; - final boolean keyEnable = commandLine.hasOption('k') ? Boolean.parseBoolean(commandLine.getOptionValue('k')) : false; + final boolean keyEnable = commandLine.hasOption('k') && Boolean.parseBoolean(commandLine.getOptionValue('k')); System.out.printf("topic %s threadCount %d messageSize %d keyEnable %s%n", topic, threadCount, messageSize, keyEnable); @@ -140,7 +140,7 @@ public class Producer { try { Thread.sleep(3000); - } catch (InterruptedException e1) { + } catch (InterruptedException ignored) { } } catch (InterruptedException e) { statsBenchmark.getSendRequestFailedCount().incrementAndGet(); @@ -156,7 +156,7 @@ public class Producer { log.error("[BENCHMARK_PRODUCER] Send Exception", e); try { Thread.sleep(3000); - } catch (InterruptedException e1) { + } catch (InterruptedException ignored) { } } }
