http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/388ba7a5/client/src/main/java/org/apache/rocketmq/client/producer/SendResult.java ---------------------------------------------------------------------- diff --git a/client/src/main/java/org/apache/rocketmq/client/producer/SendResult.java b/client/src/main/java/org/apache/rocketmq/client/producer/SendResult.java index 0fce69e..a9dadcc 100644 --- a/client/src/main/java/org/apache/rocketmq/client/producer/SendResult.java +++ b/client/src/main/java/org/apache/rocketmq/client/producer/SendResult.java @@ -19,7 +19,6 @@ package org.apache.rocketmq.client.producer; import com.alibaba.fastjson.JSON; import org.apache.rocketmq.common.message.MessageQueue; - public class SendResult { private SendStatus sendStatus; private String msgId; @@ -41,7 +40,8 @@ public class SendResult { this.queueOffset = queueOffset; } - public SendResult(final SendStatus sendStatus, final String msgId, final MessageQueue messageQueue, final long queueOffset, final String transactionId, final String offsetMsgId, final String regionId) { + public SendResult(final SendStatus sendStatus, final String msgId, final MessageQueue messageQueue, final long queueOffset, final String transactionId, + final String offsetMsgId, final String regionId) { this.sendStatus = sendStatus; this.msgId = msgId; this.messageQueue = messageQueue; @@ -51,6 +51,14 @@ public class SendResult { this.regionId = regionId; } + public static String encoderSendResultToJson(final Object obj) { + return JSON.toJSONString(obj); + } + + public static SendResult decoderSendResultFromJson(String json) { + return JSON.parseObject(json, SendResult.class); + } + public boolean isTraceOn() { return traceOn; } @@ -67,59 +75,42 @@ public class SendResult { this.regionId = regionId; } - public static String encoderSendResultToJson(final Object obj) { - return JSON.toJSONString(obj); - } - - public static SendResult decoderSendResultFromJson(String json) { - return JSON.parseObject(json, SendResult.class); - } - public String getMsgId() { return msgId; } - public void setMsgId(String msgId) { this.msgId = msgId; } - public SendStatus getSendStatus() { return sendStatus; } - public void setSendStatus(SendStatus sendStatus) { this.sendStatus = sendStatus; } - public MessageQueue getMessageQueue() { return messageQueue; } - public void setMessageQueue(MessageQueue messageQueue) { this.messageQueue = messageQueue; } - public long getQueueOffset() { return queueOffset; } - public void setQueueOffset(long queueOffset) { this.queueOffset = queueOffset; } - public String getTransactionId() { return transactionId; } - public void setTransactionId(String transactionId) { this.transactionId = transactionId; } @@ -135,6 +126,6 @@ public class SendResult { @Override public String toString() { return "SendResult [sendStatus=" + sendStatus + ", msgId=" + msgId + ", offsetMsgId=" + offsetMsgId + ", messageQueue=" + messageQueue - + ", queueOffset=" + queueOffset + "]"; + + ", queueOffset=" + queueOffset + "]"; } }
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/388ba7a5/client/src/main/java/org/apache/rocketmq/client/producer/SendStatus.java ---------------------------------------------------------------------- diff --git a/client/src/main/java/org/apache/rocketmq/client/producer/SendStatus.java b/client/src/main/java/org/apache/rocketmq/client/producer/SendStatus.java index a12c689..95a2711 100644 --- a/client/src/main/java/org/apache/rocketmq/client/producer/SendStatus.java +++ b/client/src/main/java/org/apache/rocketmq/client/producer/SendStatus.java @@ -6,13 +6,13 @@ * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. */ package org.apache.rocketmq.client.producer; http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/388ba7a5/client/src/main/java/org/apache/rocketmq/client/producer/TransactionCheckListener.java ---------------------------------------------------------------------- diff --git a/client/src/main/java/org/apache/rocketmq/client/producer/TransactionCheckListener.java b/client/src/main/java/org/apache/rocketmq/client/producer/TransactionCheckListener.java index 6da0737..bf312aa 100644 --- a/client/src/main/java/org/apache/rocketmq/client/producer/TransactionCheckListener.java +++ b/client/src/main/java/org/apache/rocketmq/client/producer/TransactionCheckListener.java @@ -6,19 +6,18 @@ * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. */ package org.apache.rocketmq.client.producer; import org.apache.rocketmq.common.message.MessageExt; - public interface TransactionCheckListener { LocalTransactionState checkLocalTransactionState(final MessageExt msg); } http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/388ba7a5/client/src/main/java/org/apache/rocketmq/client/producer/TransactionMQProducer.java ---------------------------------------------------------------------- diff --git a/client/src/main/java/org/apache/rocketmq/client/producer/TransactionMQProducer.java b/client/src/main/java/org/apache/rocketmq/client/producer/TransactionMQProducer.java index b404216..1ea28e3 100644 --- a/client/src/main/java/org/apache/rocketmq/client/producer/TransactionMQProducer.java +++ b/client/src/main/java/org/apache/rocketmq/client/producer/TransactionMQProducer.java @@ -6,13 +6,13 @@ * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. */ package org.apache.rocketmq.client.producer; @@ -20,18 +20,15 @@ import org.apache.rocketmq.client.exception.MQClientException; import org.apache.rocketmq.common.message.Message; import org.apache.rocketmq.remoting.RPCHook; - public class TransactionMQProducer extends DefaultMQProducer { private TransactionCheckListener transactionCheckListener; private int checkThreadPoolMinSize = 1; private int checkThreadPoolMaxSize = 1; private int checkRequestHoldMax = 2000; - public TransactionMQProducer() { } - public TransactionMQProducer(final String producerGroup) { super(producerGroup); } @@ -46,17 +43,15 @@ public class TransactionMQProducer extends DefaultMQProducer { super.start(); } - @Override public void shutdown() { super.shutdown(); this.defaultMQProducerImpl.destroyTransactionEnv(); } - @Override public TransactionSendResult sendMessageInTransaction(final Message msg, - final LocalTransactionExecuter tranExecuter, final Object arg) throws MQClientException { + final LocalTransactionExecuter tranExecuter, final Object arg) throws MQClientException { if (null == this.transactionCheckListener) { throw new MQClientException("localTransactionBranchCheckListener is null", null); } @@ -64,42 +59,34 @@ public class TransactionMQProducer extends DefaultMQProducer { return this.defaultMQProducerImpl.sendMessageInTransaction(msg, tranExecuter, arg); } - public TransactionCheckListener getTransactionCheckListener() { return transactionCheckListener; } - public void setTransactionCheckListener(TransactionCheckListener transactionCheckListener) { this.transactionCheckListener = transactionCheckListener; } - public int getCheckThreadPoolMinSize() { return checkThreadPoolMinSize; } - public void setCheckThreadPoolMinSize(int checkThreadPoolMinSize) { this.checkThreadPoolMinSize = checkThreadPoolMinSize; } - public int getCheckThreadPoolMaxSize() { return checkThreadPoolMaxSize; } - public void setCheckThreadPoolMaxSize(int checkThreadPoolMaxSize) { this.checkThreadPoolMaxSize = checkThreadPoolMaxSize; } - public int getCheckRequestHoldMax() { return checkRequestHoldMax; } - public void setCheckRequestHoldMax(int checkRequestHoldMax) { this.checkRequestHoldMax = checkRequestHoldMax; } http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/388ba7a5/client/src/main/java/org/apache/rocketmq/client/producer/TransactionSendResult.java ---------------------------------------------------------------------- diff --git a/client/src/main/java/org/apache/rocketmq/client/producer/TransactionSendResult.java b/client/src/main/java/org/apache/rocketmq/client/producer/TransactionSendResult.java index ca9e713..c2395ab 100644 --- a/client/src/main/java/org/apache/rocketmq/client/producer/TransactionSendResult.java +++ b/client/src/main/java/org/apache/rocketmq/client/producer/TransactionSendResult.java @@ -6,29 +6,26 @@ * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. */ package org.apache.rocketmq.client.producer; public class TransactionSendResult extends SendResult { private LocalTransactionState localTransactionState; - public TransactionSendResult() { } - public LocalTransactionState getLocalTransactionState() { return localTransactionState; } - public void setLocalTransactionState(LocalTransactionState localTransactionState) { this.localTransactionState = localTransactionState; } http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/388ba7a5/client/src/main/java/org/apache/rocketmq/client/producer/selector/SelectMessageQueueByHash.java ---------------------------------------------------------------------- diff --git a/client/src/main/java/org/apache/rocketmq/client/producer/selector/SelectMessageQueueByHash.java b/client/src/main/java/org/apache/rocketmq/client/producer/selector/SelectMessageQueueByHash.java index 15c7e55..3574039 100644 --- a/client/src/main/java/org/apache/rocketmq/client/producer/selector/SelectMessageQueueByHash.java +++ b/client/src/main/java/org/apache/rocketmq/client/producer/selector/SelectMessageQueueByHash.java @@ -6,23 +6,21 @@ * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. */ package org.apache.rocketmq.client.producer.selector; +import java.util.List; import org.apache.rocketmq.client.producer.MessageQueueSelector; import org.apache.rocketmq.common.message.Message; import org.apache.rocketmq.common.message.MessageQueue; -import java.util.List; - - public class SelectMessageQueueByHash implements MessageQueueSelector { @Override http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/388ba7a5/client/src/main/java/org/apache/rocketmq/client/producer/selector/SelectMessageQueueByMachineRoom.java ---------------------------------------------------------------------- diff --git a/client/src/main/java/org/apache/rocketmq/client/producer/selector/SelectMessageQueueByMachineRoom.java b/client/src/main/java/org/apache/rocketmq/client/producer/selector/SelectMessageQueueByMachineRoom.java index d2bf6f3..07f8b70 100644 --- a/client/src/main/java/org/apache/rocketmq/client/producer/selector/SelectMessageQueueByMachineRoom.java +++ b/client/src/main/java/org/apache/rocketmq/client/producer/selector/SelectMessageQueueByMachineRoom.java @@ -6,39 +6,34 @@ * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. */ package org.apache.rocketmq.client.producer.selector; +import java.util.List; +import java.util.Set; import org.apache.rocketmq.client.producer.MessageQueueSelector; import org.apache.rocketmq.common.message.Message; import org.apache.rocketmq.common.message.MessageQueue; -import java.util.List; -import java.util.Set; - - public class SelectMessageQueueByMachineRoom implements MessageQueueSelector { private Set<String> consumeridcs; - @Override public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) { return null; } - public Set<String> getConsumeridcs() { return consumeridcs; } - public void setConsumeridcs(Set<String> consumeridcs) { this.consumeridcs = consumeridcs; } http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/388ba7a5/client/src/main/java/org/apache/rocketmq/client/producer/selector/SelectMessageQueueByRandoom.java ---------------------------------------------------------------------- diff --git a/client/src/main/java/org/apache/rocketmq/client/producer/selector/SelectMessageQueueByRandoom.java b/client/src/main/java/org/apache/rocketmq/client/producer/selector/SelectMessageQueueByRandoom.java index 38d04c1..685afc9 100644 --- a/client/src/main/java/org/apache/rocketmq/client/producer/selector/SelectMessageQueueByRandoom.java +++ b/client/src/main/java/org/apache/rocketmq/client/producer/selector/SelectMessageQueueByRandoom.java @@ -6,28 +6,25 @@ * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. */ package org.apache.rocketmq.client.producer.selector; +import java.util.List; +import java.util.Random; import org.apache.rocketmq.client.producer.MessageQueueSelector; import org.apache.rocketmq.common.message.Message; import org.apache.rocketmq.common.message.MessageQueue; -import java.util.List; -import java.util.Random; - - public class SelectMessageQueueByRandoom implements MessageQueueSelector { private Random random = new Random(System.currentTimeMillis()); - @Override public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) { int value = random.nextInt(); http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/388ba7a5/client/src/main/java/org/apache/rocketmq/client/stat/ConsumerStatsManager.java ---------------------------------------------------------------------- diff --git a/client/src/main/java/org/apache/rocketmq/client/stat/ConsumerStatsManager.java b/client/src/main/java/org/apache/rocketmq/client/stat/ConsumerStatsManager.java index 3234ada..c723951 100644 --- a/client/src/main/java/org/apache/rocketmq/client/stat/ConsumerStatsManager.java +++ b/client/src/main/java/org/apache/rocketmq/client/stat/ConsumerStatsManager.java @@ -6,17 +6,18 @@ * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. */ package org.apache.rocketmq.client.stat; +import java.util.concurrent.ScheduledExecutorService; import org.apache.rocketmq.common.constant.LoggerName; import org.apache.rocketmq.common.protocol.body.ConsumeStatus; import org.apache.rocketmq.common.stats.StatsItemSet; @@ -24,9 +25,6 @@ import org.apache.rocketmq.common.stats.StatsSnapshot; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.util.concurrent.ScheduledExecutorService; - - public class ConsumerStatsManager { private static final Logger log = LoggerFactory.getLogger(LoggerName.CLIENT_LOGGER_NAME); @@ -42,53 +40,45 @@ public class ConsumerStatsManager { private final StatsItemSet topicAndGroupPullTPS; private final StatsItemSet topicAndGroupPullRT; - public ConsumerStatsManager(final ScheduledExecutorService scheduledExecutorService) { this.topicAndGroupConsumeOKTPS = - new StatsItemSet(TOPIC_AND_GROUP_CONSUME_OK_TPS, scheduledExecutorService, log); + new StatsItemSet(TOPIC_AND_GROUP_CONSUME_OK_TPS, scheduledExecutorService, log); this.topicAndGroupConsumeRT = - new StatsItemSet(TOPIC_AND_GROUP_CONSUME_RT, scheduledExecutorService, log); + new StatsItemSet(TOPIC_AND_GROUP_CONSUME_RT, scheduledExecutorService, log); this.topicAndGroupConsumeFailedTPS = - new StatsItemSet(TOPIC_AND_GROUP_CONSUME_FAILED_TPS, scheduledExecutorService, log); + new StatsItemSet(TOPIC_AND_GROUP_CONSUME_FAILED_TPS, scheduledExecutorService, log); this.topicAndGroupPullTPS = new StatsItemSet(TOPIC_AND_GROUP_PULL_TPS, scheduledExecutorService, log); this.topicAndGroupPullRT = new StatsItemSet(TOPIC_AND_GROUP_PULL_RT, scheduledExecutorService, log); } - public void start() { } - public void shutdown() { } - public void incPullRT(final String group, final String topic, final long rt) { - this.topicAndGroupPullRT.addValue(topic + "@" + group, (int) rt, 1); + this.topicAndGroupPullRT.addValue(topic + "@" + group, (int)rt, 1); } - public void incPullTPS(final String group, final String topic, final long msgs) { - this.topicAndGroupPullTPS.addValue(topic + "@" + group, (int) msgs, 1); + this.topicAndGroupPullTPS.addValue(topic + "@" + group, (int)msgs, 1); } - public void incConsumeRT(final String group, final String topic, final long rt) { - this.topicAndGroupConsumeRT.addValue(topic + "@" + group, (int) rt, 1); + this.topicAndGroupConsumeRT.addValue(topic + "@" + group, (int)rt, 1); } - public void incConsumeOKTPS(final String group, final String topic, final long msgs) { - this.topicAndGroupConsumeOKTPS.addValue(topic + "@" + group, (int) msgs, 1); + this.topicAndGroupConsumeOKTPS.addValue(topic + "@" + group, (int)msgs, 1); } - public void incConsumeFailedTPS(final String group, final String topic, final long msgs) { - this.topicAndGroupConsumeFailedTPS.addValue(topic + "@" + group, (int) msgs, 1); + this.topicAndGroupConsumeFailedTPS.addValue(topic + "@" + group, (int)msgs, 1); } public ConsumeStatus consumeStatus(final String group, final String topic) { http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/388ba7a5/client/src/main/resources/logback_rocketmq_client.xml ---------------------------------------------------------------------- diff --git a/client/src/main/resources/logback_rocketmq_client.xml b/client/src/main/resources/logback_rocketmq_client.xml index a845ee4..94688ab 100644 --- a/client/src/main/resources/logback_rocketmq_client.xml +++ b/client/src/main/resources/logback_rocketmq_client.xml @@ -28,7 +28,7 @@ <maxIndex>${client.logFileMaxIndex}</maxIndex> </rollingPolicy> <triggeringPolicy - class="ch.qos.logback.core.rolling.SizeBasedTriggeringPolicy"> + class="ch.qos.logback.core.rolling.SizeBasedTriggeringPolicy"> <maxFileSize>100MB</maxFileSize> </triggeringPolicy> <encoder> http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/388ba7a5/client/src/test/java/org/apache/rocketmq/client/ValidatorsTest.java ---------------------------------------------------------------------- diff --git a/client/src/test/java/org/apache/rocketmq/client/ValidatorsTest.java b/client/src/test/java/org/apache/rocketmq/client/ValidatorsTest.java index a3daba5..d4530c0 100644 --- a/client/src/test/java/org/apache/rocketmq/client/ValidatorsTest.java +++ b/client/src/test/java/org/apache/rocketmq/client/ValidatorsTest.java @@ -6,13 +6,13 @@ * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. */ package org.apache.rocketmq.client; @@ -20,7 +20,6 @@ package org.apache.rocketmq.client; import org.apache.rocketmq.client.exception.MQClientException; import org.junit.Test; - public class ValidatorsTest { @Test http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/388ba7a5/common/pom.xml ---------------------------------------------------------------------- diff --git a/common/pom.xml b/common/pom.xml index ec95a76..b46d2d5 100644 --- a/common/pom.xml +++ b/common/pom.xml @@ -15,7 +15,7 @@ limitations under the License. --> -<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" +<project xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns="http://maven.apache.org/POM/4.0.0" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <parent> <groupId>org.apache.rocketmq</groupId> http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/388ba7a5/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java ---------------------------------------------------------------------- diff --git a/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java b/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java index e11122a..aaaa0c1 100644 --- a/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java +++ b/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java @@ -16,14 +16,12 @@ */ package org.apache.rocketmq.common; +import java.net.InetAddress; +import java.net.UnknownHostException; import org.apache.rocketmq.common.annotation.ImportantField; import org.apache.rocketmq.common.constant.PermName; import org.apache.rocketmq.remoting.common.RemotingUtil; -import java.net.InetAddress; -import java.net.UnknownHostException; - - public class BrokerConfig { private String rocketmqHome = System.getProperty(MixAll.ROCKETMQ_HOME_PROPERTY, System.getenv(MixAll.ROCKETMQ_HOME_ENV)); @ImportantField @@ -87,7 +85,6 @@ public class BrokerConfig { private boolean transferMsgByHeap = true; private int maxDelayTime = 40; - private String regionId = MixAll.DEFAULT_TRACE_REGION_ID; private int registerBrokerTimeoutMills = 6000; @@ -102,6 +99,16 @@ public class BrokerConfig { private boolean traceOn = true; + public static String localHostName() { + try { + return InetAddress.getLocalHost().getHostName(); + } catch (UnknownHostException e) { + e.printStackTrace(); + } + + return "DEFAULT_BROKER"; + } + public boolean isTraceOn() { return traceOn; } @@ -150,16 +157,6 @@ public class BrokerConfig { this.slaveReadEnable = slaveReadEnable; } - public static String localHostName() { - try { - return InetAddress.getLocalHost().getHostName(); - } catch (UnknownHostException e) { - e.printStackTrace(); - } - - return "DEFAULT_BROKER"; - } - public int getRegisterBrokerTimeoutMills() { return registerBrokerTimeoutMills; } @@ -196,87 +193,70 @@ public class BrokerConfig { return highSpeedMode; } - public void setHighSpeedMode(final boolean highSpeedMode) { this.highSpeedMode = highSpeedMode; } - public String getRocketmqHome() { return rocketmqHome; } - public void setRocketmqHome(String rocketmqHome) { this.rocketmqHome = rocketmqHome; } - public String getBrokerName() { return brokerName; } - public void setBrokerName(String brokerName) { this.brokerName = brokerName; } - public int getBrokerPermission() { return brokerPermission; } - public void setBrokerPermission(int brokerPermission) { this.brokerPermission = brokerPermission; } - public int getDefaultTopicQueueNums() { return defaultTopicQueueNums; } - public void setDefaultTopicQueueNums(int defaultTopicQueueNums) { this.defaultTopicQueueNums = defaultTopicQueueNums; } - public boolean isAutoCreateTopicEnable() { return autoCreateTopicEnable; } - public void setAutoCreateTopicEnable(boolean autoCreateTopic) { this.autoCreateTopicEnable = autoCreateTopic; } - public String getBrokerClusterName() { return brokerClusterName; } - public void setBrokerClusterName(String brokerClusterName) { this.brokerClusterName = brokerClusterName; } - public String getBrokerIP1() { return brokerIP1; } - public void setBrokerIP1(String brokerIP1) { this.brokerIP1 = brokerIP1; } - public String getBrokerIP2() { return brokerIP2; } - public void setBrokerIP2(String brokerIP2) { this.brokerIP2 = brokerIP2; } @@ -289,192 +269,154 @@ public class BrokerConfig { this.sendMessageThreadPoolNums = sendMessageThreadPoolNums; } - public int getPullMessageThreadPoolNums() { return pullMessageThreadPoolNums; } - public void setPullMessageThreadPoolNums(int pullMessageThreadPoolNums) { this.pullMessageThreadPoolNums = pullMessageThreadPoolNums; } - public int getAdminBrokerThreadPoolNums() { return adminBrokerThreadPoolNums; } - public void setAdminBrokerThreadPoolNums(int adminBrokerThreadPoolNums) { this.adminBrokerThreadPoolNums = adminBrokerThreadPoolNums; } - public int getFlushConsumerOffsetInterval() { return flushConsumerOffsetInterval; } - public void setFlushConsumerOffsetInterval(int flushConsumerOffsetInterval) { this.flushConsumerOffsetInterval = flushConsumerOffsetInterval; } - public int getFlushConsumerOffsetHistoryInterval() { return flushConsumerOffsetHistoryInterval; } - public void setFlushConsumerOffsetHistoryInterval(int flushConsumerOffsetHistoryInterval) { this.flushConsumerOffsetHistoryInterval = flushConsumerOffsetHistoryInterval; } - public boolean isClusterTopicEnable() { return clusterTopicEnable; } - public void setClusterTopicEnable(boolean clusterTopicEnable) { this.clusterTopicEnable = clusterTopicEnable; } - public String getNamesrvAddr() { return namesrvAddr; } - public void setNamesrvAddr(String namesrvAddr) { this.namesrvAddr = namesrvAddr; } - public long getBrokerId() { return brokerId; } - public void setBrokerId(long brokerId) { this.brokerId = brokerId; } - public boolean isAutoCreateSubscriptionGroup() { return autoCreateSubscriptionGroup; } - public void setAutoCreateSubscriptionGroup(boolean autoCreateSubscriptionGroup) { this.autoCreateSubscriptionGroup = autoCreateSubscriptionGroup; } - public boolean isRejectTransactionMessage() { return rejectTransactionMessage; } - public void setRejectTransactionMessage(boolean rejectTransactionMessage) { this.rejectTransactionMessage = rejectTransactionMessage; } - public boolean isFetchNamesrvAddrByAddressServer() { return fetchNamesrvAddrByAddressServer; } - public void setFetchNamesrvAddrByAddressServer(boolean fetchNamesrvAddrByAddressServer) { this.fetchNamesrvAddrByAddressServer = fetchNamesrvAddrByAddressServer; } - public int getSendThreadPoolQueueCapacity() { return sendThreadPoolQueueCapacity; } - public void setSendThreadPoolQueueCapacity(int sendThreadPoolQueueCapacity) { this.sendThreadPoolQueueCapacity = sendThreadPoolQueueCapacity; } - public int getPullThreadPoolQueueCapacity() { return pullThreadPoolQueueCapacity; } - public void setPullThreadPoolQueueCapacity(int pullThreadPoolQueueCapacity) { this.pullThreadPoolQueueCapacity = pullThreadPoolQueueCapacity; } - public boolean isBrokerTopicEnable() { return brokerTopicEnable; } - public void setBrokerTopicEnable(boolean brokerTopicEnable) { this.brokerTopicEnable = brokerTopicEnable; } - public int getFilterServerNums() { return filterServerNums; } - public void setFilterServerNums(int filterServerNums) { this.filterServerNums = filterServerNums; } - public boolean isLongPollingEnable() { return longPollingEnable; } - public void setLongPollingEnable(boolean longPollingEnable) { this.longPollingEnable = longPollingEnable; } - public boolean isNotifyConsumerIdsChangedEnable() { return notifyConsumerIdsChangedEnable; } - public void setNotifyConsumerIdsChangedEnable(boolean notifyConsumerIdsChangedEnable) { this.notifyConsumerIdsChangedEnable = notifyConsumerIdsChangedEnable; } - public long getShortPollingTimeMills() { return shortPollingTimeMills; } - public void setShortPollingTimeMills(long shortPollingTimeMills) { this.shortPollingTimeMills = shortPollingTimeMills; } - public int getClientManageThreadPoolNums() { return clientManageThreadPoolNums; } - public void setClientManageThreadPoolNums(int clientManageThreadPoolNums) { this.clientManageThreadPoolNums = clientManageThreadPoolNums; } - public boolean isCommercialEnable() { return commercialEnable; } - public void setCommercialEnable(final boolean commercialEnable) { this.commercialEnable = commercialEnable; } @@ -507,7 +449,6 @@ public class BrokerConfig { return maxDelayTime; } - public void setMaxDelayTime(final int maxDelayTime) { this.maxDelayTime = maxDelayTime; } http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/388ba7a5/common/src/main/java/org/apache/rocketmq/common/BrokerConfigSingleton.java ---------------------------------------------------------------------- diff --git a/common/src/main/java/org/apache/rocketmq/common/BrokerConfigSingleton.java b/common/src/main/java/org/apache/rocketmq/common/BrokerConfigSingleton.java index fc73b71..1dae6d4 100644 --- a/common/src/main/java/org/apache/rocketmq/common/BrokerConfigSingleton.java +++ b/common/src/main/java/org/apache/rocketmq/common/BrokerConfigSingleton.java @@ -6,13 +6,13 @@ * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. */ package org.apache.rocketmq.common; http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/388ba7a5/common/src/main/java/org/apache/rocketmq/common/ConfigManager.java ---------------------------------------------------------------------- diff --git a/common/src/main/java/org/apache/rocketmq/common/ConfigManager.java b/common/src/main/java/org/apache/rocketmq/common/ConfigManager.java index 1ade521..0510b08 100644 --- a/common/src/main/java/org/apache/rocketmq/common/ConfigManager.java +++ b/common/src/main/java/org/apache/rocketmq/common/ConfigManager.java @@ -6,27 +6,24 @@ * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. */ package org.apache.rocketmq.common; +import java.io.IOException; import org.apache.rocketmq.common.constant.LoggerName; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.io.IOException; - - public abstract class ConfigManager { private static final Logger PLOG = LoggerFactory.getLogger(LoggerName.COMMON_LOGGER_NAME); - public abstract String encode(); public boolean load() { http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/388ba7a5/common/src/main/java/org/apache/rocketmq/common/Configuration.java ---------------------------------------------------------------------- diff --git a/common/src/main/java/org/apache/rocketmq/common/Configuration.java b/common/src/main/java/org/apache/rocketmq/common/Configuration.java index 1f81ba6..5402057 100644 --- a/common/src/main/java/org/apache/rocketmq/common/Configuration.java +++ b/common/src/main/java/org/apache/rocketmq/common/Configuration.java @@ -6,19 +6,17 @@ * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. */ package org.apache.rocketmq.common; -import org.slf4j.Logger; - import java.io.IOException; import java.lang.reflect.Field; import java.lang.reflect.Modifier; @@ -27,6 +25,7 @@ import java.util.List; import java.util.Properties; import java.util.concurrent.locks.ReadWriteLock; import java.util.concurrent.locks.ReentrantReadWriteLock; +import org.slf4j.Logger; public class Configuration { @@ -135,7 +134,7 @@ public class Configuration { // check this.storePathField = object.getClass().getDeclaredField(fieldName); assert this.storePathField != null - && !Modifier.isStatic(this.storePathField.getModifiers()); + && !Modifier.isStatic(this.storePathField.getModifiers()); this.storePathField.setAccessible(true); } catch (NoSuchFieldException e) { throw new RuntimeException(e); @@ -157,7 +156,7 @@ public class Configuration { if (this.storePathFromConfig) { try { - realStorePath = (String) storePathField.get(this.storePathObject); + realStorePath = (String)storePathField.get(this.storePathObject); } catch (IllegalAccessException e) { log.error("getStorePath error, ", e); } @@ -172,6 +171,10 @@ public class Configuration { return realStorePath; } + public void setStorePath(final String storePath) { + this.storePath = storePath; + } + public void update(Properties properties) { try { readWriteLock.writeLock().lockInterruptibly(); @@ -276,10 +279,6 @@ public class Configuration { return stringBuilder.toString(); } - public void setStorePath(final String storePath) { - this.storePath = storePath; - } - private void merge(Properties from, Properties to) { for (Object key : from.keySet()) { Object fromObj = from.get(key), toObj = to.get(key); http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/388ba7a5/common/src/main/java/org/apache/rocketmq/common/CountDownLatch2.java ---------------------------------------------------------------------- diff --git a/common/src/main/java/org/apache/rocketmq/common/CountDownLatch2.java b/common/src/main/java/org/apache/rocketmq/common/CountDownLatch2.java index 23c27ac..c3e3372 100644 --- a/common/src/main/java/org/apache/rocketmq/common/CountDownLatch2.java +++ b/common/src/main/java/org/apache/rocketmq/common/CountDownLatch2.java @@ -22,62 +22,19 @@ import java.util.concurrent.locks.AbstractQueuedSynchronizer; /** * Add reset feature for @see java.util.concurrent.CountDownLatch2 - * */ public class CountDownLatch2 { - /** - * Synchronization control For CountDownLatch2. - * Uses AQS state to represent count. - */ - private static final class Sync extends AbstractQueuedSynchronizer { - private static final long serialVersionUID = 4982264981922014374L; - - private final int startCount; - - Sync(int count) { - this.startCount = count; - setState(count); - } - - int getCount() { - return getState(); - } - - protected int tryAcquireShared(int acquires) { - return (getState() == 0) ? 1 : -1; - } - - protected boolean tryReleaseShared(int releases) { - // Decrement count; signal when transition to zero - for (;;) { - int c = getState(); - if (c == 0) - return false; - int nextc = c - 1; - if (compareAndSetState(c, nextc)) - return nextc == 0; - } - } - - protected void reset() { - setState(startCount); - } - } - private final Sync sync; /** * Constructs a {@code CountDownLatch2} initialized with the given count. * - * @param count - * the number of times {@link #countDown} must be invoked - * before threads can pass through {@link #await} - * - * @throws IllegalArgumentException - * if {@code count} is negative + * @param count the number of times {@link #countDown} must be invoked before threads can pass through {@link #await} + * @throws IllegalArgumentException if {@code count} is negative */ public CountDownLatch2(int count) { - if (count < 0) throw new IllegalArgumentException("count < 0"); + if (count < 0) + throw new IllegalArgumentException("count < 0"); this.sync = new Sync(count); } @@ -105,9 +62,7 @@ public class CountDownLatch2 { * then {@link InterruptedException} is thrown and the current thread's * interrupted status is cleared. * - * @throws InterruptedException - * if the current thread is interrupted - * while waiting + * @throws InterruptedException if the current thread is interrupted while waiting */ public void await() throws InterruptedException { sync.acquireSharedInterruptibly(1); @@ -147,20 +102,13 @@ public class CountDownLatch2 { * is returned. If the time is less than or equal to zero, the method * will not wait at all. * - * @param timeout - * the maximum time to wait - * @param unit - * the time unit of the {@code timeout} argument - * - * @return {@code true} if the count reached zero and {@code false} - * if the waiting time elapsed before the count reached zero - * - * @throws InterruptedException - * if the current thread is interrupted - * while waiting + * @param timeout the maximum time to wait + * @param unit the time unit of the {@code timeout} argument + * @return {@code true} if the count reached zero and {@code false} if the waiting time elapsed before the count reached zero + * @throws InterruptedException if the current thread is interrupted while waiting */ public boolean await(long timeout, TimeUnit unit) - throws InterruptedException { + throws InterruptedException { return sync.tryAcquireSharedNanos(1, unit.toNanos(timeout)); } @@ -203,4 +151,43 @@ public class CountDownLatch2 { public String toString() { return super.toString() + "[Count = " + sync.getCount() + "]"; } + + /** + * Synchronization control For CountDownLatch2. + * Uses AQS state to represent count. + */ + private static final class Sync extends AbstractQueuedSynchronizer { + private static final long serialVersionUID = 4982264981922014374L; + + private final int startCount; + + Sync(int count) { + this.startCount = count; + setState(count); + } + + int getCount() { + return getState(); + } + + protected int tryAcquireShared(int acquires) { + return (getState() == 0) ? 1 : -1; + } + + protected boolean tryReleaseShared(int releases) { + // Decrement count; signal when transition to zero + for (; ; ) { + int c = getState(); + if (c == 0) + return false; + int nextc = c - 1; + if (compareAndSetState(c, nextc)) + return nextc == 0; + } + } + + protected void reset() { + setState(startCount); + } + } } http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/388ba7a5/common/src/main/java/org/apache/rocketmq/common/DataVersion.java ---------------------------------------------------------------------- diff --git a/common/src/main/java/org/apache/rocketmq/common/DataVersion.java b/common/src/main/java/org/apache/rocketmq/common/DataVersion.java index 1bb223f..f3e4615 100644 --- a/common/src/main/java/org/apache/rocketmq/common/DataVersion.java +++ b/common/src/main/java/org/apache/rocketmq/common/DataVersion.java @@ -6,73 +6,67 @@ * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. */ package org.apache.rocketmq.common; -import org.apache.rocketmq.remoting.protocol.RemotingSerializable; - import java.util.concurrent.atomic.AtomicLong; - +import org.apache.rocketmq.remoting.protocol.RemotingSerializable; public class DataVersion extends RemotingSerializable { private long timestatmp = System.currentTimeMillis(); private AtomicLong counter = new AtomicLong(0); - public void assignNewOne(final DataVersion dataVersion) { this.timestatmp = dataVersion.timestatmp; this.counter.set(dataVersion.counter.get()); } - public void nextVersion() { this.timestatmp = System.currentTimeMillis(); this.counter.incrementAndGet(); } - public long getTimestatmp() { return timestatmp; } - public void setTimestatmp(long timestatmp) { this.timestatmp = timestatmp; } - public AtomicLong getCounter() { return counter; } - public void setCounter(AtomicLong counter) { this.counter = counter; } - @Override public boolean equals(final Object o) { - if (this == o) return true; - if (o == null || getClass() != o.getClass()) return false; + if (this == o) + return true; + if (o == null || getClass() != o.getClass()) + return false; - final DataVersion that = (DataVersion) o; + final DataVersion that = (DataVersion)o; - if (timestatmp != that.timestatmp) return false; + if (timestatmp != that.timestatmp) + return false; return counter != null ? counter.equals(that.counter) : that.counter == null; } @Override public int hashCode() { - int result = (int) (timestatmp ^ (timestatmp >>> 32)); + int result = (int)(timestatmp ^ (timestatmp >>> 32)); result = 31 * result + (counter != null ? counter.hashCode() : 0); return result; } http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/388ba7a5/common/src/main/java/org/apache/rocketmq/common/MQVersion.java ---------------------------------------------------------------------- diff --git a/common/src/main/java/org/apache/rocketmq/common/MQVersion.java b/common/src/main/java/org/apache/rocketmq/common/MQVersion.java index 3543161..d12ee55 100644 --- a/common/src/main/java/org/apache/rocketmq/common/MQVersion.java +++ b/common/src/main/java/org/apache/rocketmq/common/MQVersion.java @@ -20,7 +20,6 @@ public class MQVersion { public static final int CURRENT_VERSION = Version.V4_0_0_SNAPSHOT.ordinal(); - public static String getVersionDesc(int value) { try { Version v = Version.values()[value]; @@ -31,7 +30,6 @@ public class MQVersion { return "HigherVersion"; } - public static Version value2Version(int value) { return Version.values()[value]; } http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/388ba7a5/common/src/main/java/org/apache/rocketmq/common/MixAll.java ---------------------------------------------------------------------- diff --git a/common/src/main/java/org/apache/rocketmq/common/MixAll.java b/common/src/main/java/org/apache/rocketmq/common/MixAll.java index bca55f5..8307dd5 100644 --- a/common/src/main/java/org/apache/rocketmq/common/MixAll.java +++ b/common/src/main/java/org/apache/rocketmq/common/MixAll.java @@ -16,10 +16,6 @@ */ package org.apache.rocketmq.common; -import org.apache.rocketmq.common.annotation.ImportantField; -import org.apache.rocketmq.common.help.FAQUrl; -import org.slf4j.Logger; - import java.io.ByteArrayInputStream; import java.io.File; import java.io.FileReader; @@ -46,7 +42,9 @@ import java.util.Map; import java.util.Properties; import java.util.Set; import java.util.concurrent.atomic.AtomicLong; - +import org.apache.rocketmq.common.annotation.ImportantField; +import org.apache.rocketmq.common.help.FAQUrl; +import org.slf4j.Logger; public class MixAll { public static final String ROCKETMQ_HOME_ENV = "ROCKETMQ_HOME"; @@ -94,7 +92,6 @@ public class MixAll { return RETRY_GROUP_TOPIC_PREFIX + consumerGroup; } - public static boolean isSysConsumerGroup(final String consumerGroup) { return consumerGroup.startsWith(CID_RMQ_SYS_PREFIX); } @@ -107,7 +104,6 @@ public class MixAll { return DLQ_GROUP_TOPIC_PREFIX + consumerGroup; } - public static String brokerVIPChannel(final boolean isChange, final String brokerAddr) { if (isChange) { String[] ipAndPort = brokerAddr.split(":"); @@ -118,7 +114,6 @@ public class MixAll { } } - public static long getPID() { String processName = java.lang.management.ManagementFactory.getRuntimeMXBean().getName(); if (processName != null && processName.length() > 0) { @@ -132,7 +127,6 @@ public class MixAll { return 0; } - public static long createBrokerId(final String ip, final int port) { InetSocketAddress isa = new InetSocketAddress(ip, port); byte[] ipArray = isa.getAddress().getAddress(); @@ -148,23 +142,19 @@ public class MixAll { String tmpFile = fileName + ".tmp"; string2FileNotSafe(str, tmpFile); - String bakFile = fileName + ".bak"; String prevContent = file2String(fileName); if (prevContent != null) { string2FileNotSafe(prevContent, bakFile); } - File file = new File(fileName); file.delete(); - file = new File(tmpFile); file.renameTo(new File(fileName)); } - public static final void string2FileNotSafe(final String str, final String fileName) throws IOException { File file = new File(fileName); File fileParent = file.getParentFile(); @@ -189,7 +179,6 @@ public class MixAll { } } - public static final String file2String(final String fileName) { File file = new File(fileName); return file2String(file); @@ -197,7 +186,7 @@ public class MixAll { public static final String file2String(final File file) { if (file.exists()) { - char[] data = new char[(int) file.length()]; + char[] data = new char[(int)file.length()]; boolean result = false; FileReader fileReader = null; @@ -252,12 +241,10 @@ public class MixAll { return url.getPath(); } - public static void printObjectProperties(final Logger log, final Object object) { printObjectProperties(log, object, false); } - public static void printObjectProperties(final Logger log, final Object object, final boolean onlyImportantField) { Field[] fields = object.getClass().getDeclaredFields(); for (Field field : fields) { @@ -293,7 +280,6 @@ public class MixAll { } } - public static String properties2String(final Properties properties) { StringBuilder sb = new StringBuilder(); for (Map.Entry<Object, Object> entry : properties.entrySet()) { @@ -379,7 +365,7 @@ public class MixAll { } else { continue; } - method.invoke(object, new Object[]{arg}); + method.invoke(object, new Object[] {arg}); } } } catch (Throwable e) { @@ -388,12 +374,10 @@ public class MixAll { } } - public static boolean isPropertiesEqual(final Properties p1, final Properties p2) { return p1.equals(p2); } - public static List<String> getLocalInetAddress() { List<String> inetAddressList = new ArrayList<String>(); try { @@ -412,7 +396,6 @@ public class MixAll { return inetAddressList; } - public static boolean isLocalAddr(String address) { for (String addr : LOCAL_INET_ADDRESS) { if (address.contains(addr)) @@ -421,19 +404,17 @@ public class MixAll { return false; } - private static String localhost() { try { InetAddress addr = InetAddress.getLocalHost(); return addr.getHostAddress(); } catch (Throwable e) { throw new RuntimeException("InetAddress java.net.InetAddress.getLocalHost() throws UnknownHostException" - + FAQUrl.suggestTodo(FAQUrl.UNKNOWN_HOST_EXCEPTION), - e); + + FAQUrl.suggestTodo(FAQUrl.UNKNOWN_HOST_EXCEPTION), + e); } } - public static boolean compareAndIncreaseOnly(final AtomicLong target, final long value) { long prev = target.get(); while (value > prev) { @@ -452,11 +433,20 @@ public class MixAll { return InetAddress.getLocalHost().getHostName(); } catch (Throwable e) { throw new RuntimeException("InetAddress java.net.InetAddress.getLocalHost() throws UnknownHostException" - + FAQUrl.suggestTodo(FAQUrl.UNKNOWN_HOST_EXCEPTION), - e); + + FAQUrl.suggestTodo(FAQUrl.UNKNOWN_HOST_EXCEPTION), + e); } } + public static String humanReadableByteCount(long bytes, boolean si) { + int unit = si ? 1000 : 1024; + if (bytes < unit) + return bytes + " B"; + int exp = (int)(Math.log(bytes) / Math.log(unit)); + String pre = (si ? "kMGTPE" : "KMGTPE").charAt(exp - 1) + (si ? "" : "i"); + return String.format("%.1f %sB", bytes / Math.pow(unit, exp), pre); + } + public Set<String> list2Set(List<String> values) { Set<String> result = new HashSet<String>(); for (String v : values) { @@ -472,12 +462,4 @@ public class MixAll { } return result; } - - public static String humanReadableByteCount(long bytes, boolean si) { - int unit = si ? 1000 : 1024; - if (bytes < unit) return bytes + " B"; - int exp = (int) (Math.log(bytes) / Math.log(unit)); - String pre = (si ? "kMGTPE" : "KMGTPE").charAt(exp - 1) + (si ? "" : "i"); - return String.format("%.1f %sB", bytes / Math.pow(unit, exp), pre); - } } http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/388ba7a5/common/src/main/java/org/apache/rocketmq/common/Pair.java ---------------------------------------------------------------------- diff --git a/common/src/main/java/org/apache/rocketmq/common/Pair.java b/common/src/main/java/org/apache/rocketmq/common/Pair.java index 45f6dee..805d0a3 100644 --- a/common/src/main/java/org/apache/rocketmq/common/Pair.java +++ b/common/src/main/java/org/apache/rocketmq/common/Pair.java @@ -6,13 +6,13 @@ * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. */ package org.apache.rocketmq.common; @@ -20,28 +20,23 @@ public class Pair<T1, T2> { private T1 object1; private T2 object2; - public Pair(T1 object1, T2 object2) { this.object1 = object1; this.object2 = object2; } - public T1 getObject1() { return object1; } - public void setObject1(T1 object1) { this.object1 = object1; } - public T2 getObject2() { return object2; } - public void setObject2(T2 object2) { this.object2 = object2; } http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/388ba7a5/common/src/main/java/org/apache/rocketmq/common/ServiceState.java ---------------------------------------------------------------------- diff --git a/common/src/main/java/org/apache/rocketmq/common/ServiceState.java b/common/src/main/java/org/apache/rocketmq/common/ServiceState.java index c8fbfea..53dd75b 100644 --- a/common/src/main/java/org/apache/rocketmq/common/ServiceState.java +++ b/common/src/main/java/org/apache/rocketmq/common/ServiceState.java @@ -6,13 +6,13 @@ * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. */ package org.apache.rocketmq.common; http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/388ba7a5/common/src/main/java/org/apache/rocketmq/common/ServiceThread.java ---------------------------------------------------------------------- diff --git a/common/src/main/java/org/apache/rocketmq/common/ServiceThread.java b/common/src/main/java/org/apache/rocketmq/common/ServiceThread.java index c0ae430..7d29868 100644 --- a/common/src/main/java/org/apache/rocketmq/common/ServiceThread.java +++ b/common/src/main/java/org/apache/rocketmq/common/ServiceThread.java @@ -6,49 +6,41 @@ * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. */ package org.apache.rocketmq.common; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; import org.apache.rocketmq.common.constant.LoggerName; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicBoolean; - public abstract class ServiceThread implements Runnable { private static final Logger STLOG = LoggerFactory.getLogger(LoggerName.COMMON_LOGGER_NAME); private static final long JOIN_TIME = 90 * 1000; protected final Thread thread; - + protected final CountDownLatch2 waitPoint = new CountDownLatch2(1); protected volatile AtomicBoolean hasNotified = new AtomicBoolean(false); - protected volatile boolean stopped = false; - protected final CountDownLatch2 waitPoint = new CountDownLatch2(1); - - public ServiceThread() { this.thread = new Thread(this, this.getServiceName()); } - public abstract String getServiceName(); - public void start() { this.thread.start(); } - public void shutdown() { this.shutdown(false); } @@ -72,7 +64,7 @@ public abstract class ServiceThread implements Runnable { } long eclipseTime = System.currentTimeMillis() - beginTime; STLOG.info("join thread " + this.getServiceName() + " eclipse time(ms) " + eclipseTime + " " - + this.getJointime()); + + this.getJointime()); } catch (InterruptedException e) { e.printStackTrace(); } http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/388ba7a5/common/src/main/java/org/apache/rocketmq/common/SystemClock.java ---------------------------------------------------------------------- diff --git a/common/src/main/java/org/apache/rocketmq/common/SystemClock.java b/common/src/main/java/org/apache/rocketmq/common/SystemClock.java index 5abc805..ff14915 100644 --- a/common/src/main/java/org/apache/rocketmq/common/SystemClock.java +++ b/common/src/main/java/org/apache/rocketmq/common/SystemClock.java @@ -6,13 +6,13 @@ * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. */ package org.apache.rocketmq.common; http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/388ba7a5/common/src/main/java/org/apache/rocketmq/common/ThreadFactoryImpl.java ---------------------------------------------------------------------- diff --git a/common/src/main/java/org/apache/rocketmq/common/ThreadFactoryImpl.java b/common/src/main/java/org/apache/rocketmq/common/ThreadFactoryImpl.java index 43ab2f2..e417da8 100644 --- a/common/src/main/java/org/apache/rocketmq/common/ThreadFactoryImpl.java +++ b/common/src/main/java/org/apache/rocketmq/common/ThreadFactoryImpl.java @@ -6,13 +6,13 @@ * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. */ package org.apache.rocketmq.common; @@ -20,17 +20,14 @@ package org.apache.rocketmq.common; import java.util.concurrent.ThreadFactory; import java.util.concurrent.atomic.AtomicLong; - public class ThreadFactoryImpl implements ThreadFactory { private final AtomicLong threadIndex = new AtomicLong(0); private final String threadNamePrefix; - public ThreadFactoryImpl(final String threadNamePrefix) { this.threadNamePrefix = threadNamePrefix; } - @Override public Thread newThread(Runnable r) { return new Thread(r, threadNamePrefix + this.threadIndex.incrementAndGet()); http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/388ba7a5/common/src/main/java/org/apache/rocketmq/common/TopicConfig.java ---------------------------------------------------------------------- diff --git a/common/src/main/java/org/apache/rocketmq/common/TopicConfig.java b/common/src/main/java/org/apache/rocketmq/common/TopicConfig.java index 9eec278..2582b9f 100644 --- a/common/src/main/java/org/apache/rocketmq/common/TopicConfig.java +++ b/common/src/main/java/org/apache/rocketmq/common/TopicConfig.java @@ -6,19 +6,18 @@ * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. */ package org.apache.rocketmq.common; import org.apache.rocketmq.common.constant.PermName; - public class TopicConfig { private static final String SEPARATOR = " "; public static int defaultReadQueueNums = 16; @@ -31,16 +30,13 @@ public class TopicConfig { private int topicSysFlag = 0; private boolean order = false; - public TopicConfig() { } - public TopicConfig(String topicName) { this.topicName = topicName; } - public TopicConfig(String topicName, int readQueueNums, int writeQueueNums, int perm) { this.topicName = topicName; this.readQueueNums = readQueueNums; @@ -48,7 +44,6 @@ public class TopicConfig { this.perm = perm; } - public String encode() { StringBuilder sb = new StringBuilder(); @@ -74,7 +69,6 @@ public class TopicConfig { return sb.toString(); } - public boolean decode(final String in) { String[] strs = in.split(SEPARATOR); if (strs != null && strs.length == 5) { @@ -94,89 +88,83 @@ public class TopicConfig { return false; } - public String getTopicName() { return topicName; } - public void setTopicName(String topicName) { this.topicName = topicName; } - public int getReadQueueNums() { return readQueueNums; } - public void setReadQueueNums(int readQueueNums) { this.readQueueNums = readQueueNums; } - public int getWriteQueueNums() { return writeQueueNums; } - public void setWriteQueueNums(int writeQueueNums) { this.writeQueueNums = writeQueueNums; } - public int getPerm() { return perm; } - public void setPerm(int perm) { this.perm = perm; } - public TopicFilterType getTopicFilterType() { return topicFilterType; } - public void setTopicFilterType(TopicFilterType topicFilterType) { this.topicFilterType = topicFilterType; } - public int getTopicSysFlag() { return topicSysFlag; } - public void setTopicSysFlag(int topicSysFlag) { this.topicSysFlag = topicSysFlag; } - public boolean isOrder() { return order; } - public void setOrder(boolean isOrder) { this.order = isOrder; } @Override public boolean equals(final Object o) { - if (this == o) return true; - if (o == null || getClass() != o.getClass()) return false; - - final TopicConfig that = (TopicConfig) o; - - if (readQueueNums != that.readQueueNums) return false; - if (writeQueueNums != that.writeQueueNums) return false; - if (perm != that.perm) return false; - if (topicSysFlag != that.topicSysFlag) return false; - if (order != that.order) return false; - if (topicName != null ? !topicName.equals(that.topicName) : that.topicName != null) return false; + if (this == o) + return true; + if (o == null || getClass() != o.getClass()) + return false; + + final TopicConfig that = (TopicConfig)o; + + if (readQueueNums != that.readQueueNums) + return false; + if (writeQueueNums != that.writeQueueNums) + return false; + if (perm != that.perm) + return false; + if (topicSysFlag != that.topicSysFlag) + return false; + if (order != that.order) + return false; + if (topicName != null ? !topicName.equals(that.topicName) : that.topicName != null) + return false; return topicFilterType == that.topicFilterType; } @@ -196,8 +184,8 @@ public class TopicConfig { @Override public String toString() { return "TopicConfig [topicName=" + topicName + ", readQueueNums=" + readQueueNums - + ", writeQueueNums=" + writeQueueNums + ", perm=" + PermName.perm2String(perm) - + ", topicFilterType=" + topicFilterType + ", topicSysFlag=" + topicSysFlag + ", order=" - + order + "]"; + + ", writeQueueNums=" + writeQueueNums + ", perm=" + PermName.perm2String(perm) + + ", topicFilterType=" + topicFilterType + ", topicSysFlag=" + topicSysFlag + ", order=" + + order + "]"; } } http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/388ba7a5/common/src/main/java/org/apache/rocketmq/common/TopicFilterType.java ---------------------------------------------------------------------- diff --git a/common/src/main/java/org/apache/rocketmq/common/TopicFilterType.java b/common/src/main/java/org/apache/rocketmq/common/TopicFilterType.java index cd3490a..182ecc7 100644 --- a/common/src/main/java/org/apache/rocketmq/common/TopicFilterType.java +++ b/common/src/main/java/org/apache/rocketmq/common/TopicFilterType.java @@ -6,13 +6,13 @@ * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. */ package org.apache.rocketmq.common;