http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/853b167b/common/src/main/java/org/apache/rocketmq/common/protocol/header/namesrv/RegisterBrokerRequestHeader.java ---------------------------------------------------------------------- diff --git a/common/src/main/java/org/apache/rocketmq/common/protocol/header/namesrv/RegisterBrokerRequestHeader.java b/common/src/main/java/org/apache/rocketmq/common/protocol/header/namesrv/RegisterBrokerRequestHeader.java index dbfecb1..1e421ea 100644 --- a/common/src/main/java/org/apache/rocketmq/common/protocol/header/namesrv/RegisterBrokerRequestHeader.java +++ b/common/src/main/java/org/apache/rocketmq/common/protocol/header/namesrv/RegisterBrokerRequestHeader.java @@ -15,6 +15,8 @@ * limitations under the License. * * $Id: RegisterBrokerRequestHeader.java 1835 2013-05-16 02:00:50Z [email protected] $ + * + * $Id: RegisterBrokerRequestHeader.java 1835 2013-05-16 02:00:50Z [email protected] $ */ /**
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/853b167b/common/src/main/java/org/apache/rocketmq/common/protocol/header/namesrv/RegisterOrderTopicRequestHeader.java ---------------------------------------------------------------------- diff --git a/common/src/main/java/org/apache/rocketmq/common/protocol/header/namesrv/RegisterOrderTopicRequestHeader.java b/common/src/main/java/org/apache/rocketmq/common/protocol/header/namesrv/RegisterOrderTopicRequestHeader.java index 0e04c79..a0bad3e 100644 --- a/common/src/main/java/org/apache/rocketmq/common/protocol/header/namesrv/RegisterOrderTopicRequestHeader.java +++ b/common/src/main/java/org/apache/rocketmq/common/protocol/header/namesrv/RegisterOrderTopicRequestHeader.java @@ -15,6 +15,8 @@ * limitations under the License. * * $Id: RegisterOrderTopicRequestHeader.java 1835 2013-05-16 02:00:50Z [email protected] $ + * + * $Id: RegisterOrderTopicRequestHeader.java 1835 2013-05-16 02:00:50Z [email protected] $ */ /** http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/853b167b/common/src/main/java/org/apache/rocketmq/common/protocol/header/namesrv/UnRegisterBrokerRequestHeader.java ---------------------------------------------------------------------- diff --git a/common/src/main/java/org/apache/rocketmq/common/protocol/header/namesrv/UnRegisterBrokerRequestHeader.java b/common/src/main/java/org/apache/rocketmq/common/protocol/header/namesrv/UnRegisterBrokerRequestHeader.java index 9f6fd27..070874f 100644 --- a/common/src/main/java/org/apache/rocketmq/common/protocol/header/namesrv/UnRegisterBrokerRequestHeader.java +++ b/common/src/main/java/org/apache/rocketmq/common/protocol/header/namesrv/UnRegisterBrokerRequestHeader.java @@ -15,6 +15,8 @@ * limitations under the License. * * $Id: UnRegisterBrokerRequestHeader.java 1835 2013-05-16 02:00:50Z [email protected] $ + * + * $Id: UnRegisterBrokerRequestHeader.java 1835 2013-05-16 02:00:50Z [email protected] $ */ /** http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/853b167b/common/src/main/java/org/apache/rocketmq/common/protocol/heartbeat/ConsumeType.java ---------------------------------------------------------------------- diff --git a/common/src/main/java/org/apache/rocketmq/common/protocol/heartbeat/ConsumeType.java b/common/src/main/java/org/apache/rocketmq/common/protocol/heartbeat/ConsumeType.java index 49fe045..248b715 100644 --- a/common/src/main/java/org/apache/rocketmq/common/protocol/heartbeat/ConsumeType.java +++ b/common/src/main/java/org/apache/rocketmq/common/protocol/heartbeat/ConsumeType.java @@ -15,6 +15,8 @@ * limitations under the License. * * $Id: ConsumeType.java 1835 2013-05-16 02:00:50Z [email protected] $ + * + * $Id: ConsumeType.java 1835 2013-05-16 02:00:50Z [email protected] $ */ /** http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/853b167b/common/src/main/java/org/apache/rocketmq/common/protocol/heartbeat/ConsumerData.java ---------------------------------------------------------------------- diff --git a/common/src/main/java/org/apache/rocketmq/common/protocol/heartbeat/ConsumerData.java b/common/src/main/java/org/apache/rocketmq/common/protocol/heartbeat/ConsumerData.java index c2a0107..75e2f9e 100644 --- a/common/src/main/java/org/apache/rocketmq/common/protocol/heartbeat/ConsumerData.java +++ b/common/src/main/java/org/apache/rocketmq/common/protocol/heartbeat/ConsumerData.java @@ -15,6 +15,8 @@ * limitations under the License. * * $Id: ConsumerData.java 1835 2013-05-16 02:00:50Z [email protected] $ + * + * $Id: ConsumerData.java 1835 2013-05-16 02:00:50Z [email protected] $ */ /** http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/853b167b/common/src/main/java/org/apache/rocketmq/common/protocol/heartbeat/HeartbeatData.java ---------------------------------------------------------------------- diff --git a/common/src/main/java/org/apache/rocketmq/common/protocol/heartbeat/HeartbeatData.java b/common/src/main/java/org/apache/rocketmq/common/protocol/heartbeat/HeartbeatData.java index 5257174..35202b9 100644 --- a/common/src/main/java/org/apache/rocketmq/common/protocol/heartbeat/HeartbeatData.java +++ b/common/src/main/java/org/apache/rocketmq/common/protocol/heartbeat/HeartbeatData.java @@ -15,6 +15,8 @@ * limitations under the License. * * $Id: HeartbeatData.java 1835 2013-05-16 02:00:50Z [email protected] $ + * + * $Id: HeartbeatData.java 1835 2013-05-16 02:00:50Z [email protected] $ */ /** http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/853b167b/common/src/main/java/org/apache/rocketmq/common/protocol/heartbeat/MessageModel.java ---------------------------------------------------------------------- diff --git a/common/src/main/java/org/apache/rocketmq/common/protocol/heartbeat/MessageModel.java b/common/src/main/java/org/apache/rocketmq/common/protocol/heartbeat/MessageModel.java index d710502..bf27e93 100644 --- a/common/src/main/java/org/apache/rocketmq/common/protocol/heartbeat/MessageModel.java +++ b/common/src/main/java/org/apache/rocketmq/common/protocol/heartbeat/MessageModel.java @@ -15,6 +15,8 @@ * limitations under the License. * * $Id: MessageModel.java 1835 2013-05-16 02:00:50Z [email protected] $ + * + * $Id: MessageModel.java 1835 2013-05-16 02:00:50Z [email protected] $ */ /** http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/853b167b/common/src/main/java/org/apache/rocketmq/common/protocol/heartbeat/ProducerData.java ---------------------------------------------------------------------- diff --git a/common/src/main/java/org/apache/rocketmq/common/protocol/heartbeat/ProducerData.java b/common/src/main/java/org/apache/rocketmq/common/protocol/heartbeat/ProducerData.java index b7d4c95..f30e154 100644 --- a/common/src/main/java/org/apache/rocketmq/common/protocol/heartbeat/ProducerData.java +++ b/common/src/main/java/org/apache/rocketmq/common/protocol/heartbeat/ProducerData.java @@ -15,6 +15,8 @@ * limitations under the License. * * $Id: ProducerData.java 1835 2013-05-16 02:00:50Z [email protected] $ + * + * $Id: ProducerData.java 1835 2013-05-16 02:00:50Z [email protected] $ */ /** http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/853b167b/common/src/main/java/org/apache/rocketmq/common/protocol/heartbeat/SubscriptionData.java ---------------------------------------------------------------------- diff --git a/common/src/main/java/org/apache/rocketmq/common/protocol/heartbeat/SubscriptionData.java b/common/src/main/java/org/apache/rocketmq/common/protocol/heartbeat/SubscriptionData.java index 8c4292a..a03bdc4 100644 --- a/common/src/main/java/org/apache/rocketmq/common/protocol/heartbeat/SubscriptionData.java +++ b/common/src/main/java/org/apache/rocketmq/common/protocol/heartbeat/SubscriptionData.java @@ -15,6 +15,8 @@ * limitations under the License. * * $Id: SubscriptionData.java 1835 2013-05-16 02:00:50Z [email protected] $ + * + * $Id: SubscriptionData.java 1835 2013-05-16 02:00:50Z [email protected] $ */ /** @@ -124,7 +126,7 @@ public class SubscriptionData implements Comparable<SubscriptionData> { return false; if (getClass() != obj.getClass()) return false; - SubscriptionData other = (SubscriptionData)obj; + SubscriptionData other = (SubscriptionData) obj; if (classFilterMode != other.classFilterMode) return false; if (codeSet == null) { http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/853b167b/common/src/main/java/org/apache/rocketmq/common/protocol/route/BrokerData.java ---------------------------------------------------------------------- diff --git a/common/src/main/java/org/apache/rocketmq/common/protocol/route/BrokerData.java b/common/src/main/java/org/apache/rocketmq/common/protocol/route/BrokerData.java index f79bdb5..1382b24 100644 --- a/common/src/main/java/org/apache/rocketmq/common/protocol/route/BrokerData.java +++ b/common/src/main/java/org/apache/rocketmq/common/protocol/route/BrokerData.java @@ -15,6 +15,8 @@ * limitations under the License. * * $Id: BrokerData.java 1835 2013-05-16 02:00:50Z [email protected] $ + * + * $Id: BrokerData.java 1835 2013-05-16 02:00:50Z [email protected] $ */ /** @@ -78,7 +80,7 @@ public class BrokerData implements Comparable<BrokerData> { return false; if (getClass() != obj.getClass()) return false; - BrokerData other = (BrokerData)obj; + BrokerData other = (BrokerData) obj; if (brokerAddrs == null) { if (other.brokerAddrs != null) return false; http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/853b167b/common/src/main/java/org/apache/rocketmq/common/protocol/route/QueueData.java ---------------------------------------------------------------------- diff --git a/common/src/main/java/org/apache/rocketmq/common/protocol/route/QueueData.java b/common/src/main/java/org/apache/rocketmq/common/protocol/route/QueueData.java index 3fe3e2c..94328ae 100644 --- a/common/src/main/java/org/apache/rocketmq/common/protocol/route/QueueData.java +++ b/common/src/main/java/org/apache/rocketmq/common/protocol/route/QueueData.java @@ -15,6 +15,8 @@ * limitations under the License. * * $Id: QueueData.java 1835 2013-05-16 02:00:50Z [email protected] $ + * + * $Id: QueueData.java 1835 2013-05-16 02:00:50Z [email protected] $ */ /** @@ -81,7 +83,7 @@ public class QueueData implements Comparable<QueueData> { return false; if (getClass() != obj.getClass()) return false; - QueueData other = (QueueData)obj; + QueueData other = (QueueData) obj; if (brokerName == null) { if (other.brokerName != null) return false; http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/853b167b/common/src/main/java/org/apache/rocketmq/common/protocol/route/TopicRouteData.java ---------------------------------------------------------------------- diff --git a/common/src/main/java/org/apache/rocketmq/common/protocol/route/TopicRouteData.java b/common/src/main/java/org/apache/rocketmq/common/protocol/route/TopicRouteData.java index 64d9726..b4fd25a 100644 --- a/common/src/main/java/org/apache/rocketmq/common/protocol/route/TopicRouteData.java +++ b/common/src/main/java/org/apache/rocketmq/common/protocol/route/TopicRouteData.java @@ -15,6 +15,8 @@ * limitations under the License. * * $Id: TopicRouteData.java 1835 2013-05-16 02:00:50Z [email protected] $ + * + * $Id: TopicRouteData.java 1835 2013-05-16 02:00:50Z [email protected] $ */ /** @@ -106,7 +108,7 @@ public class TopicRouteData extends RemotingSerializable { return false; if (getClass() != obj.getClass()) return false; - TopicRouteData other = (TopicRouteData)obj; + TopicRouteData other = (TopicRouteData) obj; if (brokerDatas == null) { if (other.brokerDatas != null) return false; http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/853b167b/common/src/main/java/org/apache/rocketmq/common/queue/ConcurrentTreeMap.java ---------------------------------------------------------------------- diff --git a/common/src/main/java/org/apache/rocketmq/common/queue/ConcurrentTreeMap.java b/common/src/main/java/org/apache/rocketmq/common/queue/ConcurrentTreeMap.java index 021ba83..106384c 100644 --- a/common/src/main/java/org/apache/rocketmq/common/queue/ConcurrentTreeMap.java +++ b/common/src/main/java/org/apache/rocketmq/common/queue/ConcurrentTreeMap.java @@ -27,7 +27,6 @@ import org.slf4j.LoggerFactory; /** * thread safe - * */ public class ConcurrentTreeMap<K, V> { private static final Logger log = LoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME); http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/853b167b/common/src/main/java/org/apache/rocketmq/common/queue/RoundQueue.java ---------------------------------------------------------------------- diff --git a/common/src/main/java/org/apache/rocketmq/common/queue/RoundQueue.java b/common/src/main/java/org/apache/rocketmq/common/queue/RoundQueue.java index e9f2177..7d6d47f 100644 --- a/common/src/main/java/org/apache/rocketmq/common/queue/RoundQueue.java +++ b/common/src/main/java/org/apache/rocketmq/common/queue/RoundQueue.java @@ -22,7 +22,6 @@ import java.util.Queue; /** * not thread safe - * */ public class RoundQueue<E> { http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/853b167b/common/src/main/java/org/apache/rocketmq/common/subscription/SubscriptionGroupConfig.java ---------------------------------------------------------------------- diff --git a/common/src/main/java/org/apache/rocketmq/common/subscription/SubscriptionGroupConfig.java b/common/src/main/java/org/apache/rocketmq/common/subscription/SubscriptionGroupConfig.java index 9bf97fb..315c18d 100644 --- a/common/src/main/java/org/apache/rocketmq/common/subscription/SubscriptionGroupConfig.java +++ b/common/src/main/java/org/apache/rocketmq/common/subscription/SubscriptionGroupConfig.java @@ -114,7 +114,7 @@ public class SubscriptionGroupConfig { public int hashCode() { final int prime = 31; int result = 1; - result = prime * result + (int)(brokerId ^ (brokerId >>> 32)); + result = prime * result + (int) (brokerId ^ (brokerId >>> 32)); result = prime * result + (consumeBroadcastEnable ? 1231 : 1237); result = prime * result + (consumeEnable ? 1231 : 1237); result = prime * result + (consumeFromMinEnable ? 1231 : 1237); @@ -123,7 +123,7 @@ public class SubscriptionGroupConfig { result = prime * result + retryMaxTimes; result = prime * result + retryQueueNums; result = - prime * result + (int)(whichBrokerWhenConsumeSlowly ^ (whichBrokerWhenConsumeSlowly >>> 32)); + prime * result + (int) (whichBrokerWhenConsumeSlowly ^ (whichBrokerWhenConsumeSlowly >>> 32)); return result; } @@ -135,7 +135,7 @@ public class SubscriptionGroupConfig { return false; if (getClass() != obj.getClass()) return false; - SubscriptionGroupConfig other = (SubscriptionGroupConfig)obj; + SubscriptionGroupConfig other = (SubscriptionGroupConfig) obj; if (brokerId != other.brokerId) return false; if (consumeBroadcastEnable != other.consumeBroadcastEnable) http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/853b167b/common/src/main/java/org/apache/rocketmq/common/utils/ChannelUtil.java ---------------------------------------------------------------------- diff --git a/common/src/main/java/org/apache/rocketmq/common/utils/ChannelUtil.java b/common/src/main/java/org/apache/rocketmq/common/utils/ChannelUtil.java index dcb9187..52c3e48 100644 --- a/common/src/main/java/org/apache/rocketmq/common/utils/ChannelUtil.java +++ b/common/src/main/java/org/apache/rocketmq/common/utils/ChannelUtil.java @@ -23,7 +23,7 @@ import java.net.InetSocketAddress; public class ChannelUtil { public static String getRemoteIp(Channel channel) { - InetSocketAddress inetSocketAddress = (InetSocketAddress)channel.remoteAddress(); + InetSocketAddress inetSocketAddress = (InetSocketAddress) channel.remoteAddress(); if (inetSocketAddress == null) { return ""; } http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/853b167b/common/src/main/java/org/apache/rocketmq/common/utils/HttpTinyClient.java ---------------------------------------------------------------------- diff --git a/common/src/main/java/org/apache/rocketmq/common/utils/HttpTinyClient.java b/common/src/main/java/org/apache/rocketmq/common/utils/HttpTinyClient.java index 0cc3463..7cc9446 100755 --- a/common/src/main/java/org/apache/rocketmq/common/utils/HttpTinyClient.java +++ b/common/src/main/java/org/apache/rocketmq/common/utils/HttpTinyClient.java @@ -36,10 +36,10 @@ public class HttpTinyClient { HttpURLConnection conn = null; try { - conn = (HttpURLConnection)new URL(url).openConnection(); + conn = (HttpURLConnection) new URL(url).openConnection(); conn.setRequestMethod("GET"); - conn.setConnectTimeout((int)readTimeoutMs); - conn.setReadTimeout((int)readTimeoutMs); + conn.setConnectTimeout((int) readTimeoutMs); + conn.setReadTimeout((int) readTimeoutMs); setHeaders(conn, headers, encoding); conn.connect(); @@ -90,20 +90,12 @@ public class HttpTinyClient { } /** - - * * @param url * @param headers - * @param paramValues - * @param encoding - * @param readTimeoutMs - - * * @return the http response of given http post request - * * @throws java.io.IOException */ static public HttpResult httpPost(String url, List<String> headers, List<String> paramValues, @@ -112,10 +104,10 @@ public class HttpTinyClient { HttpURLConnection conn = null; try { - conn = (HttpURLConnection)new URL(url).openConnection(); + conn = (HttpURLConnection) new URL(url).openConnection(); conn.setRequestMethod("POST"); conn.setConnectTimeout(3000); - conn.setReadTimeout((int)readTimeoutMs); + conn.setReadTimeout((int) readTimeoutMs); conn.setDoOutput(true); conn.setDoInput(true); setHeaders(conn, headers, encoding); http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/853b167b/common/src/main/java/org/apache/rocketmq/common/utils/IOTinyUtils.java ---------------------------------------------------------------------- diff --git a/common/src/main/java/org/apache/rocketmq/common/utils/IOTinyUtils.java b/common/src/main/java/org/apache/rocketmq/common/utils/IOTinyUtils.java index b569c24..e2e7cba 100644 --- a/common/src/main/java/org/apache/rocketmq/common/utils/IOTinyUtils.java +++ b/common/src/main/java/org/apache/rocketmq/common/utils/IOTinyUtils.java @@ -56,14 +56,11 @@ public class IOTinyUtils { return count; } - /** - - */ static public List<String> readLines(Reader input) throws IOException { BufferedReader reader = toBufferedReader(input); List<String> list = new ArrayList<String>(); - String line = null; - for (; ; ) { + String line; + for (;;) { line = reader.readLine(); if (null != line) { list.add(line); @@ -75,7 +72,7 @@ public class IOTinyUtils { } static private BufferedReader toBufferedReader(Reader reader) { - return reader instanceof BufferedReader ? (BufferedReader)reader : new BufferedReader(reader); + return reader instanceof BufferedReader ? (BufferedReader) reader : new BufferedReader(reader); } static public void copyFile(String source, String target) throws IOException { http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/853b167b/example/src/main/java/org/apache/rocketmq/example/benchmark/Consumer.java ---------------------------------------------------------------------- diff --git a/example/src/main/java/org/apache/rocketmq/example/benchmark/Consumer.java b/example/src/main/java/org/apache/rocketmq/example/benchmark/Consumer.java index f810f5a..a3e06aa 100644 --- a/example/src/main/java/org/apache/rocketmq/example/benchmark/Consumer.java +++ b/example/src/main/java/org/apache/rocketmq/example/benchmark/Consumer.java @@ -76,9 +76,9 @@ public class Consumer { Long[] end = snapshotList.getLast(); final long consumeTps = - (long)(((end[1] - begin[1]) / (double)(end[0] - begin[0])) * 1000L); - final double averageB2CRT = (end[2] - begin[2]) / (double)(end[1] - begin[1]); - final double averageS2CRT = (end[3] - begin[3]) / (double)(end[1] - begin[1]); + (long) (((end[1] - begin[1]) / (double) (end[0] - begin[0])) * 1000L); + final double averageB2CRT = (end[2] - begin[2]) / (double) (end[1] - begin[1]); + final double averageS2CRT = (end[3] - begin[3]) / (double) (end[1] - begin[1]); System.out.printf("Consume TPS: %d Average(B2C) RT: %7.3f Average(S2C) RT: %7.3f MAX(B2C) RT: %d MAX(S2C) RT: %d%n", consumeTps, averageB2CRT, averageS2CRT, end[4], end[5] http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/853b167b/example/src/main/java/org/apache/rocketmq/example/benchmark/Producer.java ---------------------------------------------------------------------- diff --git a/example/src/main/java/org/apache/rocketmq/example/benchmark/Producer.java b/example/src/main/java/org/apache/rocketmq/example/benchmark/Producer.java index 88e9a4f..8a1dd31 100644 --- a/example/src/main/java/org/apache/rocketmq/example/benchmark/Producer.java +++ b/example/src/main/java/org/apache/rocketmq/example/benchmark/Producer.java @@ -81,8 +81,8 @@ public class Producer { Long[] begin = snapshotList.getFirst(); Long[] end = snapshotList.getLast(); - final long sendTps = (long)(((end[3] - begin[3]) / (double)(end[0] - begin[0])) * 1000L); - final double averageRT = (end[5] - begin[5]) / (double)(end[3] - begin[3]); + final long sendTps = (long) (((end[3] - begin[3]) / (double) (end[0] - begin[0])) * 1000L); + final double averageRT = (end[5] - begin[5]) / (double) (end[3] - begin[3]); System.out.printf("Send TPS: %d Max RT: %d Average RT: %7.3f Send Failed: %d Response Failed: %d%n", sendTps, statsBenchmark.getSendMessageMaxRT().get(), averageRT, end[2], end[4]); http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/853b167b/example/src/main/java/org/apache/rocketmq/example/benchmark/TransactionProducer.java ---------------------------------------------------------------------- diff --git a/example/src/main/java/org/apache/rocketmq/example/benchmark/TransactionProducer.java b/example/src/main/java/org/apache/rocketmq/example/benchmark/TransactionProducer.java index ce4b1ab..2a62b10 100644 --- a/example/src/main/java/org/apache/rocketmq/example/benchmark/TransactionProducer.java +++ b/example/src/main/java/org/apache/rocketmq/example/benchmark/TransactionProducer.java @@ -73,8 +73,8 @@ public class TransactionProducer { Long[] end = snapshotList.getLast(); final long sendTps = - (long)(((end[3] - begin[3]) / (double)(end[0] - begin[0])) * 1000L); - final double averageRT = (end[5] - begin[5]) / (double)(end[3] - begin[3]); + (long) (((end[3] - begin[3]) / (double) (end[0] - begin[0])) * 1000L); + final double averageRT = (end[5] - begin[5]) / (double) (end[3] - begin[3]); System.out.printf( "Send TPS: %d Max RT: %d Average RT: %7.3f Send Failed: %d Response Failed: %d transaction checkCount: %d %n", http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/853b167b/example/src/main/java/org/apache/rocketmq/example/ordermessage/Producer.java ---------------------------------------------------------------------- diff --git a/example/src/main/java/org/apache/rocketmq/example/ordermessage/Producer.java b/example/src/main/java/org/apache/rocketmq/example/ordermessage/Producer.java index 7abbb5a..9680780 100644 --- a/example/src/main/java/org/apache/rocketmq/example/ordermessage/Producer.java +++ b/example/src/main/java/org/apache/rocketmq/example/ordermessage/Producer.java @@ -44,7 +44,7 @@ public class Producer { SendResult sendResult = producer.send(msg, new MessageQueueSelector() { @Override public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) { - Integer id = (Integer)arg; + Integer id = (Integer) arg; int index = id % mqs.size(); return mqs.get(index); } http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/853b167b/filtersrv/src/main/java/org/apache/rocketmq/filtersrv/FilterServerOuterAPI.java ---------------------------------------------------------------------- diff --git a/filtersrv/src/main/java/org/apache/rocketmq/filtersrv/FilterServerOuterAPI.java b/filtersrv/src/main/java/org/apache/rocketmq/filtersrv/FilterServerOuterAPI.java index 32b8bad..c01aadc 100644 --- a/filtersrv/src/main/java/org/apache/rocketmq/filtersrv/FilterServerOuterAPI.java +++ b/filtersrv/src/main/java/org/apache/rocketmq/filtersrv/FilterServerOuterAPI.java @@ -60,7 +60,7 @@ public class FilterServerOuterAPI { switch (response.getCode()) { case ResponseCode.SUCCESS: { RegisterFilterServerResponseHeader responseHeader = - (RegisterFilterServerResponseHeader)response + (RegisterFilterServerResponseHeader) response .decodeCommandCustomHeader(RegisterFilterServerResponseHeader.class); return responseHeader; http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/853b167b/filtersrv/src/main/java/org/apache/rocketmq/filtersrv/FiltersrvStartup.java ---------------------------------------------------------------------- diff --git a/filtersrv/src/main/java/org/apache/rocketmq/filtersrv/FiltersrvStartup.java b/filtersrv/src/main/java/org/apache/rocketmq/filtersrv/FiltersrvStartup.java index f239caf..7df4748 100644 --- a/filtersrv/src/main/java/org/apache/rocketmq/filtersrv/FiltersrvStartup.java +++ b/filtersrv/src/main/java/org/apache/rocketmq/filtersrv/FiltersrvStartup.java @@ -120,7 +120,7 @@ public class FiltersrvStartup { System.exit(-2); } - LoggerContext lc = (LoggerContext)LoggerFactory.getILoggerFactory(); + LoggerContext lc = (LoggerContext) LoggerFactory.getILoggerFactory(); JoranConfigurator configurator = new JoranConfigurator(); configurator.setContext(lc); lc.reset(); http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/853b167b/filtersrv/src/main/java/org/apache/rocketmq/filtersrv/filter/DynaCode.java ---------------------------------------------------------------------- diff --git a/filtersrv/src/main/java/org/apache/rocketmq/filtersrv/filter/DynaCode.java b/filtersrv/src/main/java/org/apache/rocketmq/filtersrv/filter/DynaCode.java index 11102d0..e3b3441 100644 --- a/filtersrv/src/main/java/org/apache/rocketmq/filtersrv/filter/DynaCode.java +++ b/filtersrv/src/main/java/org/apache/rocketmq/filtersrv/filter/DynaCode.java @@ -95,7 +95,7 @@ public class DynaCode { StringBuffer buf = new StringBuffer(); while (cl != null) { if (cl instanceof URLClassLoader) { - URL urls[] = ((URLClassLoader)cl).getURLs(); + URL urls[] = ((URLClassLoader) cl).getURLs(); for (int i = 0; i < urls.length; i++) { if (buf.length() > 0) { buf.append(File.pathSeparatorChar); http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/853b167b/filtersrv/src/main/java/org/apache/rocketmq/filtersrv/filter/FilterClassManager.java ---------------------------------------------------------------------- diff --git a/filtersrv/src/main/java/org/apache/rocketmq/filtersrv/filter/FilterClassManager.java b/filtersrv/src/main/java/org/apache/rocketmq/filtersrv/filter/FilterClassManager.java index 66389e0..e4e6deb 100644 --- a/filtersrv/src/main/java/org/apache/rocketmq/filtersrv/filter/FilterClassManager.java +++ b/filtersrv/src/main/java/org/apache/rocketmq/filtersrv/filter/FilterClassManager.java @@ -84,7 +84,7 @@ public class FilterClassManager { Class<?> newClass = DynaCode.compileAndLoadClass(filterClassInfo.getClassName(), javaSource); Object newInstance = newClass.newInstance(); - filterClassInfo.setMessageFilter((MessageFilter)newInstance); + filterClassInfo.setMessageFilter((MessageFilter) newInstance); filterClassInfo.setClassCRC(classCRC); log.info("fetch Remote class File OK, {} {}", next.getKey(), @@ -134,7 +134,7 @@ public class FilterClassManager { String javaSource = new String(filterSourceBinary, MixAll.DEFAULT_CHARSET); Class<?> newClass = DynaCode.compileAndLoadClass(className, javaSource); Object newInstance = newClass.newInstance(); - filterClassInfoNew.setMessageFilter((MessageFilter)newInstance); + filterClassInfoNew.setMessageFilter((MessageFilter) newInstance); filterClassInfoNew.setClassCRC(classCRC); } http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/853b167b/filtersrv/src/main/java/org/apache/rocketmq/filtersrv/processor/DefaultRequestProcessor.java ---------------------------------------------------------------------- diff --git a/filtersrv/src/main/java/org/apache/rocketmq/filtersrv/processor/DefaultRequestProcessor.java b/filtersrv/src/main/java/org/apache/rocketmq/filtersrv/processor/DefaultRequestProcessor.java index 1d56ac1..2740b21 100644 --- a/filtersrv/src/main/java/org/apache/rocketmq/filtersrv/processor/DefaultRequestProcessor.java +++ b/filtersrv/src/main/java/org/apache/rocketmq/filtersrv/processor/DefaultRequestProcessor.java @@ -85,7 +85,7 @@ public class DefaultRequestProcessor implements NettyRequestProcessor { private RemotingCommand registerMessageFilterClass(ChannelHandlerContext ctx, RemotingCommand request) throws RemotingCommandException { final RemotingCommand response = RemotingCommand.createResponseCommand(null); final RegisterMessageFilterClassRequestHeader requestHeader = - (RegisterMessageFilterClassRequestHeader)request.decodeCommandCustomHeader(RegisterMessageFilterClassRequestHeader.class); + (RegisterMessageFilterClassRequestHeader) request.decodeCommandCustomHeader(RegisterMessageFilterClassRequestHeader.class); try { boolean ok = this.filtersrvController.getFilterClassManager().registerFilterClass(requestHeader.getConsumerGroup(), @@ -109,9 +109,9 @@ public class DefaultRequestProcessor implements NettyRequestProcessor { private RemotingCommand pullMessageForward(final ChannelHandlerContext ctx, final RemotingCommand request) throws Exception { final RemotingCommand response = RemotingCommand.createResponseCommand(PullMessageResponseHeader.class); - final PullMessageResponseHeader responseHeader = (PullMessageResponseHeader)response.readCustomHeader(); + final PullMessageResponseHeader responseHeader = (PullMessageResponseHeader) response.readCustomHeader(); final PullMessageRequestHeader requestHeader = - (PullMessageRequestHeader)request.decodeCommandCustomHeader(PullMessageRequestHeader.class); + (PullMessageRequestHeader) request.decodeCommandCustomHeader(PullMessageRequestHeader.class); final FilterContext filterContext = new FilterContext(); filterContext.setConsumerGroup(requestHeader.getConsumerGroup()); @@ -331,10 +331,10 @@ public class DefaultRequestProcessor implements NettyRequestProcessor { if (bodyLength > 0) msgStoreItemMemory.put(msgInner.getBody()); // 16 TOPIC - msgStoreItemMemory.put((byte)topicLength); + msgStoreItemMemory.put((byte) topicLength); msgStoreItemMemory.put(topicData); // 17 PROPERTIES - msgStoreItemMemory.putShort((short)propertiesLength); + msgStoreItemMemory.putShort((short) propertiesLength); if (propertiesLength > 0) msgStoreItemMemory.put(propertiesData); http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/853b167b/namesrv/src/main/java/org/apache/rocketmq/namesrv/NamesrvStartup.java ---------------------------------------------------------------------- diff --git a/namesrv/src/main/java/org/apache/rocketmq/namesrv/NamesrvStartup.java b/namesrv/src/main/java/org/apache/rocketmq/namesrv/NamesrvStartup.java index 0eb9a52..eb1afc8 100644 --- a/namesrv/src/main/java/org/apache/rocketmq/namesrv/NamesrvStartup.java +++ b/namesrv/src/main/java/org/apache/rocketmq/namesrv/NamesrvStartup.java @@ -102,7 +102,7 @@ public class NamesrvStartup { System.exit(-2); } - LoggerContext lc = (LoggerContext)LoggerFactory.getILoggerFactory(); + LoggerContext lc = (LoggerContext) LoggerFactory.getILoggerFactory(); JoranConfigurator configurator = new JoranConfigurator(); configurator.setContext(lc); lc.reset(); http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/853b167b/namesrv/src/main/java/org/apache/rocketmq/namesrv/processor/ClusterTestRequestProcessor.java ---------------------------------------------------------------------- diff --git a/namesrv/src/main/java/org/apache/rocketmq/namesrv/processor/ClusterTestRequestProcessor.java b/namesrv/src/main/java/org/apache/rocketmq/namesrv/processor/ClusterTestRequestProcessor.java index 95410fa..d66b3e8 100644 --- a/namesrv/src/main/java/org/apache/rocketmq/namesrv/processor/ClusterTestRequestProcessor.java +++ b/namesrv/src/main/java/org/apache/rocketmq/namesrv/processor/ClusterTestRequestProcessor.java @@ -53,7 +53,7 @@ public class ClusterTestRequestProcessor extends DefaultRequestProcessor { public RemotingCommand getRouteInfoByTopic(ChannelHandlerContext ctx, RemotingCommand request) throws RemotingCommandException { final RemotingCommand response = RemotingCommand.createResponseCommand(null); final GetRouteInfoRequestHeader requestHeader = - (GetRouteInfoRequestHeader)request.decodeCommandCustomHeader(GetRouteInfoRequestHeader.class); + (GetRouteInfoRequestHeader) request.decodeCommandCustomHeader(GetRouteInfoRequestHeader.class); TopicRouteData topicRouteData = this.namesrvController.getRouteInfoManager().pickupTopicRouteData(requestHeader.getTopic()); if (topicRouteData != null) { http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/853b167b/namesrv/src/main/java/org/apache/rocketmq/namesrv/processor/DefaultRequestProcessor.java ---------------------------------------------------------------------- diff --git a/namesrv/src/main/java/org/apache/rocketmq/namesrv/processor/DefaultRequestProcessor.java b/namesrv/src/main/java/org/apache/rocketmq/namesrv/processor/DefaultRequestProcessor.java index 0135274..213b096 100644 --- a/namesrv/src/main/java/org/apache/rocketmq/namesrv/processor/DefaultRequestProcessor.java +++ b/namesrv/src/main/java/org/apache/rocketmq/namesrv/processor/DefaultRequestProcessor.java @@ -127,7 +127,7 @@ public class DefaultRequestProcessor implements NettyRequestProcessor { public RemotingCommand putKVConfig(ChannelHandlerContext ctx, RemotingCommand request) throws RemotingCommandException { final RemotingCommand response = RemotingCommand.createResponseCommand(null); final PutKVConfigRequestHeader requestHeader = - (PutKVConfigRequestHeader)request.decodeCommandCustomHeader(PutKVConfigRequestHeader.class); + (PutKVConfigRequestHeader) request.decodeCommandCustomHeader(PutKVConfigRequestHeader.class); this.namesrvController.getKvConfigManager().putKVConfig( requestHeader.getNamespace(), @@ -142,9 +142,9 @@ public class DefaultRequestProcessor implements NettyRequestProcessor { public RemotingCommand getKVConfig(ChannelHandlerContext ctx, RemotingCommand request) throws RemotingCommandException { final RemotingCommand response = RemotingCommand.createResponseCommand(GetKVConfigResponseHeader.class); - final GetKVConfigResponseHeader responseHeader = (GetKVConfigResponseHeader)response.readCustomHeader(); + final GetKVConfigResponseHeader responseHeader = (GetKVConfigResponseHeader) response.readCustomHeader(); final GetKVConfigRequestHeader requestHeader = - (GetKVConfigRequestHeader)request.decodeCommandCustomHeader(GetKVConfigRequestHeader.class); + (GetKVConfigRequestHeader) request.decodeCommandCustomHeader(GetKVConfigRequestHeader.class); String value = this.namesrvController.getKvConfigManager().getKVConfig( requestHeader.getNamespace(), @@ -166,7 +166,7 @@ public class DefaultRequestProcessor implements NettyRequestProcessor { public RemotingCommand deleteKVConfig(ChannelHandlerContext ctx, RemotingCommand request) throws RemotingCommandException { final RemotingCommand response = RemotingCommand.createResponseCommand(null); final DeleteKVConfigRequestHeader requestHeader = - (DeleteKVConfigRequestHeader)request.decodeCommandCustomHeader(DeleteKVConfigRequestHeader.class); + (DeleteKVConfigRequestHeader) request.decodeCommandCustomHeader(DeleteKVConfigRequestHeader.class); this.namesrvController.getKvConfigManager().deleteKVConfig( requestHeader.getNamespace(), @@ -181,9 +181,9 @@ public class DefaultRequestProcessor implements NettyRequestProcessor { public RemotingCommand registerBrokerWithFilterServer(ChannelHandlerContext ctx, RemotingCommand request) throws RemotingCommandException { final RemotingCommand response = RemotingCommand.createResponseCommand(RegisterBrokerResponseHeader.class); - final RegisterBrokerResponseHeader responseHeader = (RegisterBrokerResponseHeader)response.readCustomHeader(); + final RegisterBrokerResponseHeader responseHeader = (RegisterBrokerResponseHeader) response.readCustomHeader(); final RegisterBrokerRequestHeader requestHeader = - (RegisterBrokerRequestHeader)request.decodeCommandCustomHeader(RegisterBrokerRequestHeader.class); + (RegisterBrokerRequestHeader) request.decodeCommandCustomHeader(RegisterBrokerRequestHeader.class); RegisterBrokerBody registerBrokerBody = new RegisterBrokerBody(); @@ -217,9 +217,9 @@ public class DefaultRequestProcessor implements NettyRequestProcessor { public RemotingCommand registerBroker(ChannelHandlerContext ctx, RemotingCommand request) throws RemotingCommandException { final RemotingCommand response = RemotingCommand.createResponseCommand(RegisterBrokerResponseHeader.class); - final RegisterBrokerResponseHeader responseHeader = (RegisterBrokerResponseHeader)response.readCustomHeader(); + final RegisterBrokerResponseHeader responseHeader = (RegisterBrokerResponseHeader) response.readCustomHeader(); final RegisterBrokerRequestHeader requestHeader = - (RegisterBrokerRequestHeader)request.decodeCommandCustomHeader(RegisterBrokerRequestHeader.class); + (RegisterBrokerRequestHeader) request.decodeCommandCustomHeader(RegisterBrokerRequestHeader.class); TopicConfigSerializeWrapper topicConfigWrapper; if (request.getBody() != null) { @@ -254,7 +254,7 @@ public class DefaultRequestProcessor implements NettyRequestProcessor { public RemotingCommand unregisterBroker(ChannelHandlerContext ctx, RemotingCommand request) throws RemotingCommandException { final RemotingCommand response = RemotingCommand.createResponseCommand(null); final UnRegisterBrokerRequestHeader requestHeader = - (UnRegisterBrokerRequestHeader)request.decodeCommandCustomHeader(UnRegisterBrokerRequestHeader.class); + (UnRegisterBrokerRequestHeader) request.decodeCommandCustomHeader(UnRegisterBrokerRequestHeader.class); this.namesrvController.getRouteInfoManager().unregisterBroker( requestHeader.getClusterName(), @@ -270,7 +270,7 @@ public class DefaultRequestProcessor implements NettyRequestProcessor { public RemotingCommand getRouteInfoByTopic(ChannelHandlerContext ctx, RemotingCommand request) throws RemotingCommandException { final RemotingCommand response = RemotingCommand.createResponseCommand(null); final GetRouteInfoRequestHeader requestHeader = - (GetRouteInfoRequestHeader)request.decodeCommandCustomHeader(GetRouteInfoRequestHeader.class); + (GetRouteInfoRequestHeader) request.decodeCommandCustomHeader(GetRouteInfoRequestHeader.class); TopicRouteData topicRouteData = this.namesrvController.getRouteInfoManager().pickupTopicRouteData(requestHeader.getTopic()); @@ -308,9 +308,9 @@ public class DefaultRequestProcessor implements NettyRequestProcessor { private RemotingCommand wipeWritePermOfBroker(ChannelHandlerContext ctx, RemotingCommand request) throws RemotingCommandException { final RemotingCommand response = RemotingCommand.createResponseCommand(WipeWritePermOfBrokerResponseHeader.class); - final WipeWritePermOfBrokerResponseHeader responseHeader = (WipeWritePermOfBrokerResponseHeader)response.readCustomHeader(); + final WipeWritePermOfBrokerResponseHeader responseHeader = (WipeWritePermOfBrokerResponseHeader) response.readCustomHeader(); final WipeWritePermOfBrokerRequestHeader requestHeader = - (WipeWritePermOfBrokerRequestHeader)request.decodeCommandCustomHeader(WipeWritePermOfBrokerRequestHeader.class); + (WipeWritePermOfBrokerRequestHeader) request.decodeCommandCustomHeader(WipeWritePermOfBrokerRequestHeader.class); int wipeTopicCnt = this.namesrvController.getRouteInfoManager().wipeWritePermOfBrokerByLock(requestHeader.getBrokerName()); @@ -339,7 +339,7 @@ public class DefaultRequestProcessor implements NettyRequestProcessor { private RemotingCommand deleteTopicInNamesrv(ChannelHandlerContext ctx, RemotingCommand request) throws RemotingCommandException { final RemotingCommand response = RemotingCommand.createResponseCommand(null); final DeleteTopicInNamesrvRequestHeader requestHeader = - (DeleteTopicInNamesrvRequestHeader)request.decodeCommandCustomHeader(DeleteTopicInNamesrvRequestHeader.class); + (DeleteTopicInNamesrvRequestHeader) request.decodeCommandCustomHeader(DeleteTopicInNamesrvRequestHeader.class); this.namesrvController.getRouteInfoManager().deleteTopic(requestHeader.getTopic()); @@ -351,7 +351,7 @@ public class DefaultRequestProcessor implements NettyRequestProcessor { private RemotingCommand getKVListByNamespace(ChannelHandlerContext ctx, RemotingCommand request) throws RemotingCommandException { final RemotingCommand response = RemotingCommand.createResponseCommand(null); final GetKVListByNamespaceRequestHeader requestHeader = - (GetKVListByNamespaceRequestHeader)request.decodeCommandCustomHeader(GetKVListByNamespaceRequestHeader.class); + (GetKVListByNamespaceRequestHeader) request.decodeCommandCustomHeader(GetKVListByNamespaceRequestHeader.class); byte[] jsonValue = this.namesrvController.getKvConfigManager().getKVListByNamespace( requestHeader.getNamespace()); @@ -370,7 +370,7 @@ public class DefaultRequestProcessor implements NettyRequestProcessor { private RemotingCommand getTopicsByCluster(ChannelHandlerContext ctx, RemotingCommand request) throws RemotingCommandException { final RemotingCommand response = RemotingCommand.createResponseCommand(null); final GetTopicsByClusterRequestHeader requestHeader = - (GetTopicsByClusterRequestHeader)request.decodeCommandCustomHeader(GetTopicsByClusterRequestHeader.class); + (GetTopicsByClusterRequestHeader) request.decodeCommandCustomHeader(GetTopicsByClusterRequestHeader.class); byte[] body = this.namesrvController.getRouteInfoManager().getTopicsByCluster(requestHeader.getCluster()); http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/853b167b/namesrv/src/main/java/org/apache/rocketmq/namesrv/routeinfo/RouteInfoManager.java ---------------------------------------------------------------------- diff --git a/namesrv/src/main/java/org/apache/rocketmq/namesrv/routeinfo/RouteInfoManager.java b/namesrv/src/main/java/org/apache/rocketmq/namesrv/routeinfo/RouteInfoManager.java index e440e61..95dca3a 100644 --- a/namesrv/src/main/java/org/apache/rocketmq/namesrv/routeinfo/RouteInfoManager.java +++ b/namesrv/src/main/java/org/apache/rocketmq/namesrv/routeinfo/RouteInfoManager.java @@ -385,7 +385,7 @@ public class RouteInfoManager { if (null != brokerData) { BrokerData brokerDataClone = new BrokerData(); brokerDataClone.setBrokerName(brokerData.getBrokerName()); - brokerDataClone.setBrokerAddrs((HashMap<Long, String>)brokerData + brokerDataClone.setBrokerAddrs((HashMap<Long, String>) brokerData .getBrokerAddrs().clone()); brokerDataList.add(brokerDataClone); foundBrokerData = true; http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/853b167b/pom.xml ---------------------------------------------------------------------- diff --git a/pom.xml b/pom.xml index f74b5e2..76acc20 100644 --- a/pom.xml +++ b/pom.xml @@ -16,7 +16,7 @@ limitations under the License. --> -<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" +<project xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns="http://maven.apache.org/POM/4.0.0" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd"> <parent> @@ -309,7 +309,7 @@ <id>verify</id> <phase>verify</phase> <configuration> - <configLocation>checkstyle/checkstyle.xml</configLocation> + <configLocation>style/rmq_checkstyle.xml</configLocation> <encoding>UTF-8</encoding> <consoleOutput>true</consoleOutput> <failsOnError>true</failsOnError> http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/853b167b/remoting/src/main/java/org/apache/rocketmq/remoting/common/RemotingHelper.java ---------------------------------------------------------------------- diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/common/RemotingHelper.java b/remoting/src/main/java/org/apache/rocketmq/remoting/common/RemotingHelper.java index 8d189e7..4fb4ed6 100644 --- a/remoting/src/main/java/org/apache/rocketmq/remoting/common/RemotingHelper.java +++ b/remoting/src/main/java/org/apache/rocketmq/remoting/common/RemotingHelper.java @@ -67,7 +67,7 @@ public class RemotingHelper { socketChannel.configureBlocking(true); //bugfix http://bugs.sun.com/bugdatabase/view_bug.do?bug_id=4614802 - socketChannel.socket().setSoTimeout((int)timeoutMillis); + socketChannel.socket().setSoTimeout((int) timeoutMillis); ByteBuffer byteBufferRequest = request.encode(); while (byteBufferRequest.hasRemaining()) { @@ -168,7 +168,7 @@ public class RemotingHelper { if (null == channel) { return ""; } - final InetSocketAddress remote = (InetSocketAddress)channel.remoteAddress(); + final InetSocketAddress remote = (InetSocketAddress) channel.remoteAddress(); if (remote != null) { return remote.getAddress().getHostName(); } @@ -188,7 +188,7 @@ public class RemotingHelper { public static String parseSocketAddressName(SocketAddress socketAddress) { - final InetSocketAddress addrs = (InetSocketAddress)socketAddress; + final InetSocketAddress addrs = (InetSocketAddress) socketAddress; if (addrs != null) { return addrs.getAddress().getHostName(); } http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/853b167b/remoting/src/main/java/org/apache/rocketmq/remoting/common/RemotingUtil.java ---------------------------------------------------------------------- diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/common/RemotingUtil.java b/remoting/src/main/java/org/apache/rocketmq/remoting/common/RemotingUtil.java index bcc2232..5589c04 100644 --- a/remoting/src/main/java/org/apache/rocketmq/remoting/common/RemotingUtil.java +++ b/remoting/src/main/java/org/apache/rocketmq/remoting/common/RemotingUtil.java @@ -67,7 +67,7 @@ public class RemotingUtil { try { final Method method = providerClazz.getMethod("provider"); if (method != null) { - final SelectorProvider selectorProvider = (SelectorProvider)method.invoke(null); + final SelectorProvider selectorProvider = (SelectorProvider) method.invoke(null); if (selectorProvider != null) { result = selectorProvider.openSelector(); } @@ -155,7 +155,7 @@ public class RemotingUtil { public static String socketAddress2String(final SocketAddress addr) { StringBuilder sb = new StringBuilder(); - InetSocketAddress inetSocketAddress = (InetSocketAddress)addr; + InetSocketAddress inetSocketAddress = (InetSocketAddress) addr; sb.append(inetSocketAddress.getAddress().getHostAddress()); sb.append(":"); sb.append(inetSocketAddress.getPort()); http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/853b167b/remoting/src/main/java/org/apache/rocketmq/remoting/common/ServiceThread.java ---------------------------------------------------------------------- diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/common/ServiceThread.java b/remoting/src/main/java/org/apache/rocketmq/remoting/common/ServiceThread.java index 9cccaaf..62b26ba 100644 --- a/remoting/src/main/java/org/apache/rocketmq/remoting/common/ServiceThread.java +++ b/remoting/src/main/java/org/apache/rocketmq/remoting/common/ServiceThread.java @@ -21,8 +21,6 @@ import org.slf4j.LoggerFactory; /** * Base class for background thread - * - * */ public abstract class ServiceThread implements Runnable { private static final Logger STLOG = LoggerFactory.getLogger(RemotingHelper.ROCKETMQ_REMOTING); http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/853b167b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyDecoder.java ---------------------------------------------------------------------- diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyDecoder.java b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyDecoder.java index 73d7f2b..106e55c 100644 --- a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyDecoder.java +++ b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyDecoder.java @@ -42,7 +42,7 @@ public class NettyDecoder extends LengthFieldBasedFrameDecoder { public Object decode(ChannelHandlerContext ctx, ByteBuf in) throws Exception { ByteBuf frame = null; try { - frame = (ByteBuf)super.decode(ctx, in); + frame = (ByteBuf) super.decode(ctx, in); if (null == frame) { return null; } http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/853b167b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingClient.java ---------------------------------------------------------------------- diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingClient.java b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingClient.java index db7815a..cd8ef97 100644 --- a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingClient.java +++ b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingClient.java @@ -637,7 +637,7 @@ public class NettyRemotingClient extends NettyRemotingAbstract implements Remoti @Override public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception { if (evt instanceof IdleStateEvent) { - IdleStateEvent evnet = (IdleStateEvent)evt; + IdleStateEvent evnet = (IdleStateEvent) evt; if (evnet.state().equals(IdleState.ALL_IDLE)) { final String remoteAddress = RemotingHelper.parseChannelRemoteAddr(ctx.channel()); log.warn("NETTY CLIENT PIPELINE: IDLE exception [{}]", remoteAddress); http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/853b167b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingServer.java ---------------------------------------------------------------------- diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingServer.java b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingServer.java index f109086..58ba617 100644 --- a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingServer.java +++ b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingServer.java @@ -171,7 +171,7 @@ public class NettyRemotingServer extends NettyRemotingAbstract implements Remoti try { ChannelFuture sync = this.serverBootstrap.bind().sync(); - InetSocketAddress addr = (InetSocketAddress)sync.channel().localAddress(); + InetSocketAddress addr = (InetSocketAddress) sync.channel().localAddress(); this.port = addr.getPort(); } catch (InterruptedException e1) { throw new RuntimeException("this.serverBootstrap.bind().sync() InterruptedException", e1); @@ -337,7 +337,7 @@ public class NettyRemotingServer extends NettyRemotingAbstract implements Remoti @Override public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception { if (evt instanceof IdleStateEvent) { - IdleStateEvent evnet = (IdleStateEvent)evt; + IdleStateEvent evnet = (IdleStateEvent) evt; if (evnet.state().equals(IdleState.ALL_IDLE)) { final String remoteAddress = RemotingHelper.parseChannelRemoteAddr(ctx.channel()); log.warn("NETTY SERVER PIPELINE: IDLE exception [{}]", remoteAddress); http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/853b167b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRequestProcessor.java ---------------------------------------------------------------------- diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRequestProcessor.java b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRequestProcessor.java index c6251e9..020124b 100644 --- a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRequestProcessor.java +++ b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRequestProcessor.java @@ -21,8 +21,6 @@ import org.apache.rocketmq.remoting.protocol.RemotingCommand; /** * Common remoting command processor - * - * */ public interface NettyRequestProcessor { RemotingCommand processRequest(ChannelHandlerContext ctx, RemotingCommand request) http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/853b167b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyServerConfig.java ---------------------------------------------------------------------- diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyServerConfig.java b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyServerConfig.java index f69fded..8424da6 100644 --- a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyServerConfig.java +++ b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyServerConfig.java @@ -133,6 +133,6 @@ public class NettyServerConfig implements Cloneable { @Override public Object clone() throws CloneNotSupportedException { - return (NettyServerConfig)super.clone(); + return (NettyServerConfig) super.clone(); } } http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/853b167b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/RequestTask.java ---------------------------------------------------------------------- diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/RequestTask.java b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/RequestTask.java index 0443b43..d1fbbfe 100644 --- a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/RequestTask.java +++ b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/RequestTask.java @@ -36,7 +36,7 @@ public class RequestTask implements Runnable { @Override public int hashCode() { int result = runnable != null ? runnable.hashCode() : 0; - result = 31 * result + (int)(getCreateTimestamp() ^ (getCreateTimestamp() >>> 32)); + result = 31 * result + (int) (getCreateTimestamp() ^ (getCreateTimestamp() >>> 32)); result = 31 * result + (channel != null ? channel.hashCode() : 0); result = 31 * result + (request != null ? request.hashCode() : 0); result = 31 * result + (isStopRun() ? 1 : 0); @@ -50,7 +50,7 @@ public class RequestTask implements Runnable { if (!(o instanceof RequestTask)) return false; - final RequestTask that = (RequestTask)o; + final RequestTask that = (RequestTask) o; if (getCreateTimestamp() != that.getCreateTimestamp()) return false; http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/853b167b/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/LanguageCode.java ---------------------------------------------------------------------- diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/LanguageCode.java b/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/LanguageCode.java index bdb02c6..cc5e8ec 100644 --- a/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/LanguageCode.java +++ b/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/LanguageCode.java @@ -18,15 +18,15 @@ package org.apache.rocketmq.remoting.protocol; public enum LanguageCode { - JAVA((byte)0), - CPP((byte)1), - DOTNET((byte)2), - PYTHON((byte)3), - DELPHI((byte)4), - ERLANG((byte)5), - RUBY((byte)6), - OTHER((byte)7), - HTTP((byte)8); + JAVA((byte) 0), + CPP((byte) 1), + DOTNET((byte) 2), + PYTHON((byte) 3), + DELPHI((byte) 4), + ERLANG((byte) 5), + RUBY((byte) 6), + OTHER((byte) 7), + HTTP((byte) 8); private byte code; http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/853b167b/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/RemotingCommand.java ---------------------------------------------------------------------- diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/RemotingCommand.java b/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/RemotingCommand.java index 6b253dc..cffa072 100644 --- a/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/RemotingCommand.java +++ b/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/RemotingCommand.java @@ -195,7 +195,7 @@ public class RemotingCommand { } public static SerializeType getProtocolType(int source) { - return SerializeType.valueOf((byte)((source >> 24) & 0xFF)); + return SerializeType.valueOf((byte) ((source >> 24) & 0xFF)); } public static int createNewRequestId() { @@ -223,9 +223,9 @@ public class RemotingCommand { byte[] result = new byte[4]; result[0] = type.getCode(); - result[1] = (byte)((source >> 16) & 0xFF); - result[2] = (byte)((source >> 8) & 0xFF); - result[3] = (byte)(source & 0xFF); + result[1] = (byte) ((source >> 16) & 0xFF); + result[2] = (byte) ((source >> 8) & 0xFF); + result[3] = (byte) (source & 0xFF); return result; } http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/853b167b/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/RocketMQSerializable.java ---------------------------------------------------------------------- diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/RocketMQSerializable.java b/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/RocketMQSerializable.java index 64b37db..b773c1d 100644 --- a/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/RocketMQSerializable.java +++ b/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/RocketMQSerializable.java @@ -51,11 +51,11 @@ public class RocketMQSerializable { // ################### content ByteBuffer headerBuffer = ByteBuffer.allocate(totalLen); // int code(~32767) - headerBuffer.putShort((short)cmd.getCode()); + headerBuffer.putShort((short) cmd.getCode()); // LanguageCode language headerBuffer.put(cmd.getLanguage().getCode()); // int version(~32767) - headerBuffer.putShort((short)cmd.getVersion()); + headerBuffer.putShort((short) cmd.getVersion()); // int opaque headerBuffer.putInt(cmd.getOpaque()); // int flag @@ -109,7 +109,7 @@ public class RocketMQSerializable { key = entry.getKey().getBytes(RemotingSerializable.CHARSET_UTF8); val = entry.getValue().getBytes(RemotingSerializable.CHARSET_UTF8); - content.putShort((short)key.length); + content.putShort((short) key.length); content.put(key); content.putInt(val.length); http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/853b167b/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/SerializeType.java ---------------------------------------------------------------------- diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/SerializeType.java b/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/SerializeType.java index 6bfd42c..4ca6357 100644 --- a/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/SerializeType.java +++ b/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/SerializeType.java @@ -18,8 +18,8 @@ package org.apache.rocketmq.remoting.protocol; public enum SerializeType { - JSON((byte)0), - ROCKETMQ((byte)1); + JSON((byte) 0), + ROCKETMQ((byte) 1); private byte code; http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/853b167b/remoting/src/test/java/org/apache/rocketmq/remoting/MixTest.java ---------------------------------------------------------------------- diff --git a/remoting/src/test/java/org/apache/rocketmq/remoting/MixTest.java b/remoting/src/test/java/org/apache/rocketmq/remoting/MixTest.java index 984ecd1..db165c2 100644 --- a/remoting/src/test/java/org/apache/rocketmq/remoting/MixTest.java +++ b/remoting/src/test/java/org/apache/rocketmq/remoting/MixTest.java @@ -15,6 +15,8 @@ * limitations under the License. * * $Id: MixTest.java 1831 2013-05-16 01:39:51Z [email protected] $ + * + * $Id: MixTest.java 1831 2013-05-16 01:39:51Z [email protected] $ */ /** http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/853b167b/remoting/src/test/java/org/apache/rocketmq/remoting/NettyRPCTest.java ---------------------------------------------------------------------- diff --git a/remoting/src/test/java/org/apache/rocketmq/remoting/NettyRPCTest.java b/remoting/src/test/java/org/apache/rocketmq/remoting/NettyRPCTest.java index 15a9aa3..1cfb8bc 100644 --- a/remoting/src/test/java/org/apache/rocketmq/remoting/NettyRPCTest.java +++ b/remoting/src/test/java/org/apache/rocketmq/remoting/NettyRPCTest.java @@ -15,6 +15,8 @@ * limitations under the License. * * $Id: NettyRPCTest.java 1831 2013-05-16 01:39:51Z [email protected] $ + * + * $Id: NettyRPCTest.java 1831 2013-05-16 01:39:51Z [email protected] $ */ /** @@ -37,11 +39,16 @@ import org.apache.rocketmq.remoting.netty.NettyRequestProcessor; import org.apache.rocketmq.remoting.netty.NettyServerConfig; import org.apache.rocketmq.remoting.netty.ResponseFuture; import org.apache.rocketmq.remoting.protocol.RemotingCommand; +import org.junit.AfterClass; +import org.junit.BeforeClass; import org.junit.Test; import static org.junit.Assert.assertTrue; public class NettyRPCTest { + private static RemotingServer remotingServer; + private static RemotingClient remotingClient; + public static RemotingServer createRemotingServer() throws InterruptedException { NettyServerConfig config = new NettyServerConfig(); RemotingServer remotingServer = new NettyRemotingServer(config); @@ -71,84 +78,70 @@ public class NettyRPCTest { return client; } + @BeforeClass + public static void initialize() throws InterruptedException { + remotingServer = createRemotingServer(); + remotingClient = createRemotingClient(); + } + + @AfterClass + public static void destroy() { + remotingClient.shutdown(); + remotingServer.shutdown(); + } + @Test public void test_RPC_Sync() throws InterruptedException, RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException { - RemotingServer server = createRemotingServer(); - RemotingClient client = createRemotingClient(); for (int i = 0; i < 100; i++) { TestRequestHeader requestHeader = new TestRequestHeader(); requestHeader.setCount(i); requestHeader.setMessageTitle("HelloMessageTitle"); RemotingCommand request = RemotingCommand.createRequestCommand(0, requestHeader); - RemotingCommand response = client.invokeSync("localhost:8888", request, 1000 * 3000); + RemotingCommand response = remotingClient.invokeSync("localhost:8888", request, 1000 * 3000); System.out.println("invoke result = " + response); assertTrue(response != null); } - - client.shutdown(); - server.shutdown(); - System.out.println("-----------------------------------------------------------------"); } @Test public void test_RPC_Oneway() throws InterruptedException, RemotingConnectException, RemotingTimeoutException, RemotingTooMuchRequestException, RemotingSendRequestException { - RemotingServer server = createRemotingServer(); - RemotingClient client = createRemotingClient(); for (int i = 0; i < 100; i++) { RemotingCommand request = RemotingCommand.createRequestCommand(0, null); request.setRemark(String.valueOf(i)); - client.invokeOneway("localhost:8888", request, 1000 * 3); + remotingClient.invokeOneway("localhost:8888", request, 1000 * 3); } - - client.shutdown(); - server.shutdown(); - System.out.println("-----------------------------------------------------------------"); } @Test public void test_RPC_Async() throws InterruptedException, RemotingConnectException, RemotingTimeoutException, RemotingTooMuchRequestException, RemotingSendRequestException { - RemotingServer server = createRemotingServer(); - RemotingClient client = createRemotingClient(); for (int i = 0; i < 100; i++) { RemotingCommand request = RemotingCommand.createRequestCommand(0, null); request.setRemark(String.valueOf(i)); - client.invokeAsync("localhost:8888", request, 1000 * 3, new InvokeCallback() { + remotingClient.invokeAsync("localhost:8888", request, 1000 * 3, new InvokeCallback() { @Override public void operationComplete(ResponseFuture responseFuture) { System.out.println(responseFuture.getResponseCommand()); } }); } - - Thread.sleep(1000 * 3); - - client.shutdown(); - server.shutdown(); - System.out.println("-----------------------------------------------------------------"); } @Test public void test_server_call_client() throws InterruptedException, RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException { - final RemotingServer server = createRemotingServer(); - final RemotingClient client = createRemotingClient(); - server.registerProcessor(0, new NettyRequestProcessor() { + remotingServer.registerProcessor(0, new NettyRequestProcessor() { @Override public RemotingCommand processRequest(ChannelHandlerContext ctx, RemotingCommand request) { try { - return server.invokeSync(ctx.channel(), request, 1000 * 10); - } catch (InterruptedException e) { - e.printStackTrace(); - } catch (RemotingSendRequestException e) { - e.printStackTrace(); - } catch (RemotingTimeoutException e) { + return remotingServer.invokeSync(ctx.channel(), request, 1000 * 10); + } catch (InterruptedException | RemotingSendRequestException | RemotingTimeoutException e) { e.printStackTrace(); } @@ -161,7 +154,7 @@ public class NettyRPCTest { } }, Executors.newCachedThreadPool()); - client.registerProcessor(0, new NettyRequestProcessor() { + remotingClient.registerProcessor(0, new NettyRequestProcessor() { @Override public RemotingCommand processRequest(ChannelHandlerContext ctx, RemotingCommand request) { System.out.println("client receive server request = " + request); @@ -177,14 +170,10 @@ public class NettyRPCTest { for (int i = 0; i < 3; i++) { RemotingCommand request = RemotingCommand.createRequestCommand(0, null); - RemotingCommand response = client.invokeSync("localhost:8888", request, 1000 * 3); + RemotingCommand response = remotingClient.invokeSync("localhost:8888", request, 1000 * 3); System.out.println("invoke result = " + response); assertTrue(response != null); } - - client.shutdown(); - server.shutdown(); - System.out.println("-----------------------------------------------------------------"); } } http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/853b167b/store/src/main/java/org/apache/rocketmq/store/AllocateMappedFileService.java ---------------------------------------------------------------------- diff --git a/store/src/main/java/org/apache/rocketmq/store/AllocateMappedFileService.java b/store/src/main/java/org/apache/rocketmq/store/AllocateMappedFileService.java index 27b957f..f33af76 100644 --- a/store/src/main/java/org/apache/rocketmq/store/AllocateMappedFileService.java +++ b/store/src/main/java/org/apache/rocketmq/store/AllocateMappedFileService.java @@ -306,7 +306,7 @@ public class AllocateMappedFileService extends ServiceThread { return false; if (getClass() != obj.getClass()) return false; - AllocateRequest other = (AllocateRequest)obj; + AllocateRequest other = (AllocateRequest) obj; if (filePath == null) { if (other.filePath != null) return false; http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/853b167b/store/src/main/java/org/apache/rocketmq/store/AppendMessageCallback.java ---------------------------------------------------------------------- diff --git a/store/src/main/java/org/apache/rocketmq/store/AppendMessageCallback.java b/store/src/main/java/org/apache/rocketmq/store/AppendMessageCallback.java index 6d158d3..59c9a38 100644 --- a/store/src/main/java/org/apache/rocketmq/store/AppendMessageCallback.java +++ b/store/src/main/java/org/apache/rocketmq/store/AppendMessageCallback.java @@ -20,8 +20,6 @@ import java.nio.ByteBuffer; /** * Write messages callback interface - * - * */ public interface AppendMessageCallback { @@ -31,7 +29,6 @@ public interface AppendMessageCallback { * @param byteBuffer * @param maxBlank * @param msg - * * @return How many bytes to write */ AppendMessageResult doAppend(final long fileFromOffset, final ByteBuffer byteBuffer, http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/853b167b/store/src/main/java/org/apache/rocketmq/store/AppendMessageResult.java ---------------------------------------------------------------------- diff --git a/store/src/main/java/org/apache/rocketmq/store/AppendMessageResult.java b/store/src/main/java/org/apache/rocketmq/store/AppendMessageResult.java index 965097f..1fda351 100644 --- a/store/src/main/java/org/apache/rocketmq/store/AppendMessageResult.java +++ b/store/src/main/java/org/apache/rocketmq/store/AppendMessageResult.java @@ -18,7 +18,6 @@ package org.apache.rocketmq.store; /** * When write a message to the commit log, returns results - * */ public class AppendMessageResult { // Return code http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/853b167b/store/src/main/java/org/apache/rocketmq/store/AppendMessageStatus.java ---------------------------------------------------------------------- diff --git a/store/src/main/java/org/apache/rocketmq/store/AppendMessageStatus.java b/store/src/main/java/org/apache/rocketmq/store/AppendMessageStatus.java index 39cf9fa..30725f3 100644 --- a/store/src/main/java/org/apache/rocketmq/store/AppendMessageStatus.java +++ b/store/src/main/java/org/apache/rocketmq/store/AppendMessageStatus.java @@ -18,8 +18,6 @@ package org.apache.rocketmq.store; /** * When write a message to the commit log, returns code - * - * */ public enum AppendMessageStatus { PUT_OK, http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/853b167b/store/src/main/java/org/apache/rocketmq/store/CommitLog.java ---------------------------------------------------------------------- diff --git a/store/src/main/java/org/apache/rocketmq/store/CommitLog.java b/store/src/main/java/org/apache/rocketmq/store/CommitLog.java index 17625f4..06df287 100644 --- a/store/src/main/java/org/apache/rocketmq/store/CommitLog.java +++ b/store/src/main/java/org/apache/rocketmq/store/CommitLog.java @@ -143,7 +143,7 @@ public class CommitLog { int mappedFileSize = this.defaultMessageStore.getMessageStoreConfig().getMapedFileSizeCommitLog(); MappedFile mappedFile = this.mappedFileQueue.findMappedFileByOffset(offset, returnFirstOnNotFound); if (mappedFile != null) { - int pos = (int)(offset % mappedFileSize); + int pos = (int) (offset % mappedFileSize); SelectMappedBufferResult result = mappedFile.selectMappedBuffer(pos); return result; } @@ -637,7 +637,7 @@ public class CommitLog { // Synchronization flush if (FlushDiskType.SYNC_FLUSH == this.defaultMessageStore.getMessageStoreConfig().getFlushDiskType()) { - final GroupCommitService service = (GroupCommitService)this.flushCommitLogService; + final GroupCommitService service = (GroupCommitService) this.flushCommitLogService; if (msg.isWaitStoreMsgOK()) { request = new GroupCommitRequest(result.getWroteOffset() + result.getWroteBytes()); service.putRequest(request); @@ -729,7 +729,7 @@ public class CommitLog { int mappedFileSize = this.defaultMessageStore.getMessageStoreConfig().getMapedFileSizeCommitLog(); MappedFile mappedFile = this.mappedFileQueue.findMappedFileByOffset(offset, offset == 0); if (mappedFile != null) { - int pos = (int)(offset % mappedFileSize); + int pos = (int) (offset % mappedFileSize); return mappedFile.selectMappedBuffer(pos, size); } return null; @@ -1150,7 +1150,7 @@ public class CommitLog { final byte[] propertiesData = msgInner.getPropertiesString() == null ? null : msgInner.getPropertiesString().getBytes(MessageDecoder.CHARSET_UTF8); - final short propertiesLength = propertiesData == null ? 0 : (short)propertiesData.length; + final short propertiesLength = propertiesData == null ? 0 : (short) propertiesData.length; if (propertiesLength > Short.MAX_VALUE) { log.warn("putMessage message properties length too long. length={}", propertiesData.length); @@ -1226,7 +1226,7 @@ public class CommitLog { if (bodyLength > 0) this.msgStoreItemMemory.put(msgInner.getBody()); // 16 TOPIC - this.msgStoreItemMemory.put((byte)topicLength); + this.msgStoreItemMemory.put((byte) topicLength); this.msgStoreItemMemory.put(topicData); // 17 PROPERTIES this.msgStoreItemMemory.putShort(propertiesLength); http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/853b167b/store/src/main/java/org/apache/rocketmq/store/ConsumeQueue.java ---------------------------------------------------------------------- diff --git a/store/src/main/java/org/apache/rocketmq/store/ConsumeQueue.java b/store/src/main/java/org/apache/rocketmq/store/ConsumeQueue.java index d9e2f03..a060638 100644 --- a/store/src/main/java/org/apache/rocketmq/store/ConsumeQueue.java +++ b/store/src/main/java/org/apache/rocketmq/store/ConsumeQueue.java @@ -131,7 +131,7 @@ public class ConsumeQueue { if (mappedFile != null) { long offset = 0; int low = - minLogicOffset > mappedFile.getFileFromOffset() ? (int)(minLogicOffset - mappedFile + minLogicOffset > mappedFile.getFileFromOffset() ? (int) (minLogicOffset - mappedFile .getFileFromOffset()) : 0; int high = 0; int midOffset = -1, targetOffset = -1, leftOffset = -1, rightOffset = -1; @@ -407,7 +407,7 @@ public class ConsumeQueue { byteBuffer.putInt(Integer.MAX_VALUE); byteBuffer.putLong(0L); - int until = (int)(untilWhere % this.mappedFileQueue.getMappedFileSize()); + int until = (int) (untilWhere % this.mappedFileQueue.getMappedFileSize()); for (int i = 0; i < until; i += CQ_STORE_UNIT_SIZE) { mappedFile.appendMessage(byteBuffer.array()); } @@ -419,7 +419,7 @@ public class ConsumeQueue { if (offset >= this.getMinLogicOffset()) { MappedFile mappedFile = this.mappedFileQueue.findMappedFileByOffset(offset); if (mappedFile != null) { - SelectMappedBufferResult result = mappedFile.selectMappedBuffer((int)(offset % mappedFileSize)); + SelectMappedBufferResult result = mappedFile.selectMappedBuffer((int) (offset % mappedFileSize)); return result; } } http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/853b167b/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java ---------------------------------------------------------------------- diff --git a/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java b/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java index 7e3af19..6d182e6 100644 --- a/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java +++ b/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java @@ -459,7 +459,7 @@ public class DefaultMessageStore implements MessageStore { nextBeginOffset = offset + (i / ConsumeQueue.CQ_STORE_UNIT_SIZE); long diff = maxOffsetPy - maxPhyOffsetPulling; - long memory = (long)(StoreUtil.TOTAL_PHYSICAL_MEMORY_SIZE + long memory = (long) (StoreUtil.TOTAL_PHYSICAL_MEMORY_SIZE * (this.messageStoreConfig.getAccessMessageInMemoryMaxRatio() / 100.0)); getResult.setSuggestPullingFromSlave(diff > memory); } finally { @@ -1016,7 +1016,7 @@ public class DefaultMessageStore implements MessageStore { } private boolean checkInDiskByCommitOffset(long offsetPy, long maxOffsetPy) { - long memory = (long)(StoreUtil.TOTAL_PHYSICAL_MEMORY_SIZE * (this.messageStoreConfig.getAccessMessageInMemoryMaxRatio() / 100.0)); + long memory = (long) (StoreUtil.TOTAL_PHYSICAL_MEMORY_SIZE * (this.messageStoreConfig.getAccessMessageInMemoryMaxRatio() / 100.0)); return (maxOffsetPy - offsetPy) > memory; } @@ -1288,6 +1288,24 @@ public class DefaultMessageStore implements MessageStore { return brokerStatsManager; } + public int remainTransientStoreBufferNumbs() { + return this.transientStorePool.remainBufferNumbs(); + } + + @Override + public boolean isTransientStorePoolDeficient() { + return remainTransientStoreBufferNumbs() == 0; + } + + public void unlockMappedFile(final MappedFile mappedFile) { + this.scheduledExecutorService.schedule(new Runnable() { + @Override + public void run() { + mappedFile.munlock(); + } + }, 6, TimeUnit.SECONDS); + } + class CleanCommitLogService { private final static int MAX_MANUAL_DELETE_FILE_TIMES = 20; @@ -1565,6 +1583,10 @@ public class DefaultMessageStore implements MessageStore { return reputFromOffset; } + public void setReputFromOffset(long reputFromOffset) { + this.reputFromOffset = reputFromOffset; + } + @Override public void shutdown() { for (int i = 0; i < 50 && this.isCommitLogAvailable(); i++) { @@ -1582,10 +1604,6 @@ public class DefaultMessageStore implements MessageStore { super.shutdown(); } - public void setReputFromOffset(long reputFromOffset) { - this.reputFromOffset = reputFromOffset; - } - public long behind() { return DefaultMessageStore.this.commitLog.getMaxOffset() - this.reputFromOffset; } @@ -1683,22 +1701,4 @@ public class DefaultMessageStore implements MessageStore { } } - - public int remainTransientStoreBufferNumbs() { - return this.transientStorePool.remainBufferNumbs(); - } - - @Override - public boolean isTransientStorePoolDeficient() { - return remainTransientStoreBufferNumbs() == 0; - } - - public void unlockMappedFile(final MappedFile mappedFile) { - this.scheduledExecutorService.schedule(new Runnable() { - @Override - public void run() { - mappedFile.munlock(); - } - }, 6, TimeUnit.SECONDS); - } } http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/853b167b/store/src/main/java/org/apache/rocketmq/store/GetMessageResult.java ---------------------------------------------------------------------- diff --git a/store/src/main/java/org/apache/rocketmq/store/GetMessageResult.java b/store/src/main/java/org/apache/rocketmq/store/GetMessageResult.java index b7d33f3..3c7e007 100644 --- a/store/src/main/java/org/apache/rocketmq/store/GetMessageResult.java +++ b/store/src/main/java/org/apache/rocketmq/store/GetMessageResult.java @@ -86,7 +86,7 @@ public class GetMessageResult { this.messageMapedList.add(mapedBuffer); this.messageBufferList.add(mapedBuffer.getByteBuffer()); this.bufferTotalSize += mapedBuffer.getSize(); - this.msgCount4Commercial += (int)Math.ceil( + this.msgCount4Commercial += (int) Math.ceil( mapedBuffer.getSize() / BrokerStatsManager.SIZE_PER_COUNT); } http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/853b167b/store/src/main/java/org/apache/rocketmq/store/MappedFile.java ---------------------------------------------------------------------- diff --git a/store/src/main/java/org/apache/rocketmq/store/MappedFile.java b/store/src/main/java/org/apache/rocketmq/store/MappedFile.java index 6803ec9..7c2d703 100644 --- a/store/src/main/java/org/apache/rocketmq/store/MappedFile.java +++ b/store/src/main/java/org/apache/rocketmq/store/MappedFile.java @@ -125,7 +125,7 @@ public class MappedFile extends ReferenceResource { } } - ByteBuffer viewedBuffer = (ByteBuffer)invoke(buffer, methodName); + ByteBuffer viewedBuffer = (ByteBuffer) invoke(buffer, methodName); if (viewedBuffer == null) return buffer; else @@ -463,7 +463,7 @@ public class MappedFile extends ReferenceResource { int flush = 0; long time = System.currentTimeMillis(); for (int i = 0, j = 0; i < this.fileSize; i += MappedFile.OS_PAGE_SIZE, j++) { - byteBuffer.put(i, (byte)0); + byteBuffer.put(i, (byte) 0); // force flush when flush disk type is sync if (type == FlushDiskType.SYNC_FLUSH) { if ((i / OS_PAGE_SIZE) - (flush / OS_PAGE_SIZE) >= pages) { @@ -522,7 +522,7 @@ public class MappedFile extends ReferenceResource { public void mlock() { final long beginTime = System.currentTimeMillis(); - final long address = ((DirectBuffer)(this.mappedByteBuffer)).address(); + final long address = ((DirectBuffer) (this.mappedByteBuffer)).address(); Pointer pointer = new Pointer(address); { int ret = LibC.INSTANCE.mlock(pointer, new NativeLong(this.fileSize)); @@ -537,7 +537,7 @@ public class MappedFile extends ReferenceResource { public void munlock() { final long beginTime = System.currentTimeMillis(); - final long address = ((DirectBuffer)(this.mappedByteBuffer)).address(); + final long address = ((DirectBuffer) (this.mappedByteBuffer)).address(); Pointer pointer = new Pointer(address); int ret = LibC.INSTANCE.munlock(pointer, new NativeLong(this.fileSize)); log.info("munlock {} {} {} ret = {} time consuming = {}", address, this.fileName, this.fileSize, ret, System.currentTimeMillis() - beginTime);
