http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/97aa813e/store/src/main/java/org/apache/rocketmq/store/ha/HAService.java ---------------------------------------------------------------------- diff --git a/store/src/main/java/org/apache/rocketmq/store/ha/HAService.java b/store/src/main/java/org/apache/rocketmq/store/ha/HAService.java index 6fc7335..51a8a27 100644 --- a/store/src/main/java/org/apache/rocketmq/store/ha/HAService.java +++ b/store/src/main/java/org/apache/rocketmq/store/ha/HAService.java @@ -85,7 +85,6 @@ public class HAService { return result; } - public void notifyTransferSome(final long offset) { for (long value = this.push2SlaveMaxOffset.get(); offset > value; ) { boolean ok = this.push2SlaveMaxOffset.compareAndSet(value, offset); @@ -180,7 +179,9 @@ public class HAService { this.serverSocketChannel.register(this.selector, SelectionKey.OP_ACCEPT); } - /** {@inheritDoc} */ + /** + * {@inheritDoc} + */ @Override public void shutdown(final boolean interrupt) { super.shutdown(interrupt); @@ -192,7 +193,9 @@ public class HAService { } } - /** {@inheritDoc} */ + /** + * {@inheritDoc} + */ @Override public void run() { log.info(this.getServiceName() + " service started"); @@ -235,7 +238,9 @@ public class HAService { log.info(this.getServiceName() + " service end"); } - /** {@inheritDoc} */ + /** + * {@inheritDoc} + */ @Override public String getServiceName() { return AcceptSocketService.class.getSimpleName();
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/97aa813e/store/src/main/java/org/apache/rocketmq/store/index/IndexFile.java ---------------------------------------------------------------------- diff --git a/store/src/main/java/org/apache/rocketmq/store/index/IndexFile.java b/store/src/main/java/org/apache/rocketmq/store/index/IndexFile.java index 862e620..edc2476 100644 --- a/store/src/main/java/org/apache/rocketmq/store/index/IndexFile.java +++ b/store/src/main/java/org/apache/rocketmq/store/index/IndexFile.java @@ -209,7 +209,7 @@ public class IndexFile { if (slotValue <= invalidIndex || slotValue > this.indexHeader.getIndexCount() || this.indexHeader.getIndexCount() <= 1) { } else { - for (int nextIndexToRead = slotValue;;) { + for (int nextIndexToRead = slotValue; ; ) { if (phyOffsets.size() >= maxNum) { break; } http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/97aa813e/store/src/main/java/org/apache/rocketmq/store/index/IndexHeader.java ---------------------------------------------------------------------- diff --git a/store/src/main/java/org/apache/rocketmq/store/index/IndexHeader.java b/store/src/main/java/org/apache/rocketmq/store/index/IndexHeader.java index 3195448..44021cd 100644 --- a/store/src/main/java/org/apache/rocketmq/store/index/IndexHeader.java +++ b/store/src/main/java/org/apache/rocketmq/store/index/IndexHeader.java @@ -20,7 +20,6 @@ import java.nio.ByteBuffer; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; - public class IndexHeader { public static final int INDEX_HEADER_SIZE = 40; private static int beginTimestampIndex = 0; http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/97aa813e/store/src/main/java/org/apache/rocketmq/store/index/IndexService.java ---------------------------------------------------------------------- diff --git a/store/src/main/java/org/apache/rocketmq/store/index/IndexService.java b/store/src/main/java/org/apache/rocketmq/store/index/IndexService.java index c434df5..e562c2a 100644 --- a/store/src/main/java/org/apache/rocketmq/store/index/IndexService.java +++ b/store/src/main/java/org/apache/rocketmq/store/index/IndexService.java @@ -35,7 +35,9 @@ import org.slf4j.LoggerFactory; public class IndexService { private static final Logger log = LoggerFactory.getLogger(LoggerName.STORE_LOGGER_NAME); - /** Maximum times to attempt index file creation. */ + /** + * Maximum times to attempt index file creation. + */ private static final int MAX_TRY_IDX_CREATE = 3; private final DefaultMessageStore defaultMessageStore; private final int hashSlotNum; http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/97aa813e/store/src/main/java/org/apache/rocketmq/store/stats/BrokerStats.java ---------------------------------------------------------------------- diff --git a/store/src/main/java/org/apache/rocketmq/store/stats/BrokerStats.java b/store/src/main/java/org/apache/rocketmq/store/stats/BrokerStats.java index 64b4097..a3240a4 100644 --- a/store/src/main/java/org/apache/rocketmq/store/stats/BrokerStats.java +++ b/store/src/main/java/org/apache/rocketmq/store/stats/BrokerStats.java @@ -38,7 +38,6 @@ public class BrokerStats { this.defaultMessageStore = defaultMessageStore; } - public void record() { this.msgPutTotalYesterdayMorning = this.msgPutTotalTodayMorning; this.msgGetTotalYesterdayMorning = this.msgGetTotalTodayMorning; http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/97aa813e/store/src/main/java/org/apache/rocketmq/store/stats/BrokerStatsManager.java ---------------------------------------------------------------------- diff --git a/store/src/main/java/org/apache/rocketmq/store/stats/BrokerStatsManager.java b/store/src/main/java/org/apache/rocketmq/store/stats/BrokerStatsManager.java index 1515eb4..64f76ca 100644 --- a/store/src/main/java/org/apache/rocketmq/store/stats/BrokerStatsManager.java +++ b/store/src/main/java/org/apache/rocketmq/store/stats/BrokerStatsManager.java @@ -123,9 +123,11 @@ public class BrokerStatsManager { public void incTopicPutNums(final String topic) { this.statsTable.get(TOPIC_PUT_NUMS).addValue(topic, 1, 1); } + public void incTopicPutNums(final String topic, int num, int times) { this.statsTable.get(TOPIC_PUT_NUMS).addValue(topic, num, times); } + public void incTopicPutSize(final String topic, final int size) { this.statsTable.get(TOPIC_PUT_SIZE).addValue(topic, size, 1); } @@ -156,9 +158,11 @@ public class BrokerStatsManager { public void incBrokerPutNums() { this.statsTable.get(BROKER_PUT_NUMS).getAndCreateStatsItem(this.clusterName).getValue().incrementAndGet(); } + public void incBrokerPutNums(final int incValue) { this.statsTable.get(BROKER_PUT_NUMS).getAndCreateStatsItem(this.clusterName).getValue().addAndGet(incValue); } + public void incBrokerGetNums(final int incValue) { this.statsTable.get(BROKER_GET_NUMS).getAndCreateStatsItem(this.clusterName).getValue().addAndGet(incValue); } @@ -173,12 +177,14 @@ public class BrokerStatsManager { return this.statsTable.get(GROUP_GET_NUMS).getStatsDataInMinute(statsKey).getTps(); } - public void recordDiskFallBehindTime(final String group, final String topic, final int queueId, final long fallBehind) { + public void recordDiskFallBehindTime(final String group, final String topic, final int queueId, + final long fallBehind) { final String statsKey = String.format("%d@%s@%s", queueId, topic, group); this.momentStatsItemSetFallTime.getAndCreateStatsItem(statsKey).getValue().set(fallBehind); } - public void recordDiskFallBehindSize(final String group, final String topic, final int queueId, final long fallBehind) { + public void recordDiskFallBehindSize(final String group, final String topic, final int queueId, + final long fallBehind) { final String statsKey = String.format("%d@%s@%s", queueId, topic, group); this.momentStatsItemSetFallSize.getAndCreateStatsItem(statsKey).getValue().set(fallBehind); } http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/97aa813e/store/src/test/java/org/apache/rocketmq/store/AppendCallbackTest.java ---------------------------------------------------------------------- diff --git a/store/src/test/java/org/apache/rocketmq/store/AppendCallbackTest.java b/store/src/test/java/org/apache/rocketmq/store/AppendCallbackTest.java index eaa18d5..7f88d36 100644 --- a/store/src/test/java/org/apache/rocketmq/store/AppendCallbackTest.java +++ b/store/src/test/java/org/apache/rocketmq/store/AppendCallbackTest.java @@ -6,18 +6,17 @@ * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. */ package org.apache.rocketmq.store; - import java.io.File; import java.net.InetSocketAddress; import java.nio.ByteBuffer; @@ -39,7 +38,6 @@ import org.junit.Test; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; - public class AppendCallbackTest { AppendMessageCallback callback; @@ -47,7 +45,7 @@ public class AppendCallbackTest { CommitLog.MessageExtBatchEncoder batchEncoder = new CommitLog.MessageExtBatchEncoder(10 * 1024 * 1024); @Before - public void init() throws Exception{ + public void init() throws Exception { MessageStoreConfig messageStoreConfig = new MessageStoreConfig(); messageStoreConfig.setMapedFileSizeCommitLog(1024 * 8); messageStoreConfig.setMapedFileSizeConsumeQueue(1024 * 4); @@ -62,16 +60,15 @@ public class AppendCallbackTest { } @After - public void destroy(){ + public void destroy() { UtilAll.deleteFile(new File(System.getProperty("user.home") + File.separator + "unitteststore")); } - @Test - public void testAppendMessageBatchEndOfFile() throws Exception{ - List<Message> messages = new ArrayList<>(); + public void testAppendMessageBatchEndOfFile() throws Exception { + List<Message> messages = new ArrayList<>(); String topic = "test-topic"; - int queue= 0; + int queue = 0; for (int i = 0; i < 10; i++) { Message msg = new Message(); msg.setBody("body".getBytes()); @@ -83,8 +80,8 @@ public class AppendCallbackTest { messageExtBatch.setTopic(topic); messageExtBatch.setQueueId(queue); messageExtBatch.setBornTimestamp(System.currentTimeMillis()); - messageExtBatch.setBornHost(new InetSocketAddress("127.0.0.1",123)); - messageExtBatch.setStoreHost(new InetSocketAddress("127.0.0.1",124)); + messageExtBatch.setBornHost(new InetSocketAddress("127.0.0.1", 123)); + messageExtBatch.setStoreHost(new InetSocketAddress("127.0.0.1", 124)); messageExtBatch.setBody(MessageDecoder.encodeMessages(messages)); messageExtBatch.setEncodedBuff(batchEncoder.encode(messageExtBatch)); @@ -99,11 +96,12 @@ public class AppendCallbackTest { assertTrue(result.getMsgId().length() > 0); //should have already constructed some message ids } + @Test public void testAppendMessageBatchSucc() throws Exception { - List<Message> messages = new ArrayList<>(); + List<Message> messages = new ArrayList<>(); String topic = "test-topic"; - int queue= 0; + int queue = 0; for (int i = 0; i < 10; i++) { Message msg = new Message(); msg.setBody("body".getBytes()); @@ -115,8 +113,8 @@ public class AppendCallbackTest { messageExtBatch.setTopic(topic); messageExtBatch.setQueueId(queue); messageExtBatch.setBornTimestamp(System.currentTimeMillis()); - messageExtBatch.setBornHost(new InetSocketAddress("127.0.0.1",123)); - messageExtBatch.setStoreHost(new InetSocketAddress("127.0.0.1",124)); + messageExtBatch.setBornHost(new InetSocketAddress("127.0.0.1", 123)); + messageExtBatch.setStoreHost(new InetSocketAddress("127.0.0.1", 124)); messageExtBatch.setBody(MessageDecoder.encodeMessages(messages)); messageExtBatch.setEncodedBuff(batchEncoder.encode(messageExtBatch)); @@ -131,7 +129,7 @@ public class AppendCallbackTest { assertEquals(messages.size(), allresult.getMsgNum()); Set<String> msgIds = new HashSet<>(); - for (String msgId: allresult.getMsgId().split(",")) { + for (String msgId : allresult.getMsgId().split(",")) { assertEquals(32, msgId.length()); msgIds.add(msgId); } http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/97aa813e/store/src/test/java/org/apache/rocketmq/store/ConsumeQueueExtTest.java ---------------------------------------------------------------------- diff --git a/store/src/test/java/org/apache/rocketmq/store/ConsumeQueueExtTest.java b/store/src/test/java/org/apache/rocketmq/store/ConsumeQueueExtTest.java index 6c2f5ad..e213a02 100644 --- a/store/src/test/java/org/apache/rocketmq/store/ConsumeQueueExtTest.java +++ b/store/src/test/java/org/apache/rocketmq/store/ConsumeQueueExtTest.java @@ -36,7 +36,6 @@ public class ConsumeQueueExtTest { private static final int cqExtFileSize = 10 * unitSizeWithBitMap; private static final int unitCount = 20; - protected ConsumeQueueExt genExt() { return new ConsumeQueueExt( topic, queueId, storePath, cqExtFileSize, bitMapLength @@ -65,7 +64,7 @@ public class ConsumeQueueExtTest { } protected void putSth(ConsumeQueueExt consumeQueueExt, boolean getAfterPut, - boolean unitSameSize, int unitCount) { + boolean unitSameSize, int unitCount) { for (int i = 0; i < unitCount; i++) { ConsumeQueueExt.CqExtUnit putUnit = unitSameSize ? genUnit(true) : genUnit(i % 2 == 0); @@ -236,7 +235,7 @@ public class ConsumeQueueExtTest { } @After - public void destroy(){ + public void destroy() { UtilAll.deleteFile(new File(storePath)); } } http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/97aa813e/store/src/test/java/org/apache/rocketmq/store/ConsumeQueueTest.java ---------------------------------------------------------------------- diff --git a/store/src/test/java/org/apache/rocketmq/store/ConsumeQueueTest.java b/store/src/test/java/org/apache/rocketmq/store/ConsumeQueueTest.java index d07a768..b03f2fc 100644 --- a/store/src/test/java/org/apache/rocketmq/store/ConsumeQueueTest.java +++ b/store/src/test/java/org/apache/rocketmq/store/ConsumeQueueTest.java @@ -83,9 +83,8 @@ public class ConsumeQueueTest { return msg; } - public MessageStoreConfig buildStoreConfig(int commitLogFileSize, int cqFileSize, - boolean enableCqExt, int cqExtFileSize) { + boolean enableCqExt, int cqExtFileSize) { MessageStoreConfig messageStoreConfig = new MessageStoreConfig(); messageStoreConfig.setMapedFileSizeCommitLog(commitLogFileSize); messageStoreConfig.setMapedFileSizeConsumeQueue(cqFileSize); @@ -112,7 +111,7 @@ public class ConsumeQueueTest { new MessageArrivingListener() { @Override public void arriving(String topic, int queueId, long logicOffset, long tagsCode, - long msgStoreTime, byte[] filterBitMap, Map<String, String> properties) { + long msgStoreTime, byte[] filterBitMap, Map<String, String> properties) { } } , brokerConfig); http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/97aa813e/store/src/test/java/org/apache/rocketmq/store/DefaultMessageStoreTest.java ---------------------------------------------------------------------- diff --git a/store/src/test/java/org/apache/rocketmq/store/DefaultMessageStoreTest.java b/store/src/test/java/org/apache/rocketmq/store/DefaultMessageStoreTest.java index a81f328..28d7478 100644 --- a/store/src/test/java/org/apache/rocketmq/store/DefaultMessageStoreTest.java +++ b/store/src/test/java/org/apache/rocketmq/store/DefaultMessageStoreTest.java @@ -56,7 +56,7 @@ public class DefaultMessageStoreTest { File file = new File(messageStoreConfig.getStorePathRootDir()); UtilAll.deleteFile(file); } - + public MessageStore buildMessageStore() throws Exception { MessageStoreConfig messageStoreConfig = new MessageStoreConfig(); messageStoreConfig.setMapedFileSizeCommitLog(1024 * 1024 * 10); @@ -149,7 +149,6 @@ public class DefaultMessageStoreTest { GetMessageResult getMessageResult32 = messageStore.getMessage(group, topic, 0, 0, 32, null); assertThat(getMessageResult32.getMessageBufferList().size()).isEqualTo(32); - GetMessageResult getMessageResult20 = messageStore.getMessage(group, topic, 0, 0, 20, null); assertThat(getMessageResult20.getMessageBufferList().size()).isEqualTo(20); http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/97aa813e/test/src/main/java/org/apache/rocketmq/test/client/rmq/RMQSqlConsumer.java ---------------------------------------------------------------------- diff --git a/test/src/main/java/org/apache/rocketmq/test/client/rmq/RMQSqlConsumer.java b/test/src/main/java/org/apache/rocketmq/test/client/rmq/RMQSqlConsumer.java index cb0210f..3c03ee7 100644 --- a/test/src/main/java/org/apache/rocketmq/test/client/rmq/RMQSqlConsumer.java +++ b/test/src/main/java/org/apache/rocketmq/test/client/rmq/RMQSqlConsumer.java @@ -24,6 +24,7 @@ import org.apache.rocketmq.test.listener.AbstractListener; public class RMQSqlConsumer extends RMQNormalConsumer { private static Logger logger = Logger.getLogger(RMQSqlConsumer.class); private MessageSelector selector; + public RMQSqlConsumer(String nsAddr, String topic, MessageSelector selector, String consumerGroup, AbstractListener listener) { super(nsAddr, topic, "*", consumerGroup, listener); http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/97aa813e/test/src/test/java/org/apache/rocketmq/test/base/IntegrationTestBase.java ---------------------------------------------------------------------- diff --git a/test/src/test/java/org/apache/rocketmq/test/base/IntegrationTestBase.java b/test/src/test/java/org/apache/rocketmq/test/base/IntegrationTestBase.java index 64911fb..e1b8c91 100644 --- a/test/src/test/java/org/apache/rocketmq/test/base/IntegrationTestBase.java +++ b/test/src/test/java/org/apache/rocketmq/test/base/IntegrationTestBase.java @@ -55,7 +55,8 @@ public class IntegrationTestBase { static { Runtime.getRuntime().addShutdownHook(new Thread() { - @Override public void run() { + @Override + public void run() { try { for (BrokerController brokerController : BROKER_CONTROLLERS) { if (brokerController != null) { @@ -78,7 +79,7 @@ public class IntegrationTestBase { for (File file : TMPE_FILES) { UtilAll.deleteFile(file); } - } catch (Exception e){ + } catch (Exception e) { logger.error("Shutdown error", e); } } @@ -149,7 +150,7 @@ public class IntegrationTestBase { return brokerController; } - public static boolean initTopic(String topic, String nsAddr, String clusterName,int queueNumbers){ + public static boolean initTopic(String topic, String nsAddr, String clusterName, int queueNumbers) { long startTime = System.currentTimeMillis(); boolean createResult; @@ -159,7 +160,7 @@ public class IntegrationTestBase { break; } else if (System.currentTimeMillis() - startTime > topicCreateTime) { Assert.fail(String.format("topic[%s] is created failed after:%d ms", topic, - System.currentTimeMillis() - startTime)); + System.currentTimeMillis() - startTime)); break; } else { TestUtils.waitForMoment(500); @@ -171,7 +172,7 @@ public class IntegrationTestBase { } public static boolean initTopic(String topic, String nsAddr, String clusterName) { - return initTopic(topic, nsAddr, clusterName,8); + return initTopic(topic, nsAddr, clusterName, 8); } public static void deleteFile(File file) { @@ -188,5 +189,5 @@ public class IntegrationTestBase { file.delete(); } } - + } http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/97aa813e/test/src/test/java/org/apache/rocketmq/test/client/consumer/broadcast/order/OrderMsgBroadCastIT.java ---------------------------------------------------------------------- diff --git a/test/src/test/java/org/apache/rocketmq/test/client/consumer/broadcast/order/OrderMsgBroadCastIT.java b/test/src/test/java/org/apache/rocketmq/test/client/consumer/broadcast/order/OrderMsgBroadCastIT.java index 47cde74..9d8aeb3 100644 --- a/test/src/test/java/org/apache/rocketmq/test/client/consumer/broadcast/order/OrderMsgBroadCastIT.java +++ b/test/src/test/java/org/apache/rocketmq/test/client/consumer/broadcast/order/OrderMsgBroadCastIT.java @@ -62,7 +62,6 @@ public class OrderMsgBroadCastIT extends BaseBroadCastIT { consumer1.getConsumerGroup(), topic, "*", new RMQOrderListener()); TestUtils.waitForSeconds(waitTime); - List<MessageQueue> mqs = producer.getMessageQueue(); MessageQueueMsg mqMsgs = new MessageQueueMsg(mqs, msgSize); producer.send(mqMsgs.getMsgsWithMQ()); http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/97aa813e/test/src/test/java/org/apache/rocketmq/test/client/consumer/filter/SqlFilterIT.java ---------------------------------------------------------------------- diff --git a/test/src/test/java/org/apache/rocketmq/test/client/consumer/filter/SqlFilterIT.java b/test/src/test/java/org/apache/rocketmq/test/client/consumer/filter/SqlFilterIT.java index 115595d..15d91a1 100644 --- a/test/src/test/java/org/apache/rocketmq/test/client/consumer/filter/SqlFilterIT.java +++ b/test/src/test/java/org/apache/rocketmq/test/client/consumer/filter/SqlFilterIT.java @@ -67,7 +67,7 @@ public class SqlFilterIT extends BaseConf { consumer.getListener().waitForMessageConsume(msgSize * 2, consumeTime); assertThat(producer.getAllMsgBody()) .containsAllIn(VerifyUtils.getFilterdMessage(producer.getAllMsgBody(), - consumer.getListener().getAllMsgBody())); + consumer.getListener().getAllMsgBody())); assertThat(consumer.getListener().getAllMsgBody().size()).isEqualTo(msgSize * 2); } http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/97aa813e/test/src/test/java/org/apache/rocketmq/test/client/producer/batch/BatchSendIT.java ---------------------------------------------------------------------- diff --git a/test/src/test/java/org/apache/rocketmq/test/client/producer/batch/BatchSendIT.java b/test/src/test/java/org/apache/rocketmq/test/client/producer/batch/BatchSendIT.java index e372a1b..6fb34af 100644 --- a/test/src/test/java/org/apache/rocketmq/test/client/producer/batch/BatchSendIT.java +++ b/test/src/test/java/org/apache/rocketmq/test/client/producer/batch/BatchSendIT.java @@ -77,7 +77,6 @@ public class BatchSendIT extends BaseConf { } } - @Test public void testBatchSend_CheckProperties() throws Exception { List<Message> messageList = new ArrayList<>(); @@ -91,7 +90,6 @@ public class BatchSendIT extends BaseConf { message.setBody("body".getBytes()); messageList.add(message); - DefaultMQProducer producer = ProducerFactory.getRMQProducer(nsAddr); SendResult sendResult = producer.send(messageList); Assert.assertEquals(SendStatus.SEND_OK, sendResult.getSendStatus()); http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/97aa813e/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExt.java ---------------------------------------------------------------------- diff --git a/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExt.java b/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExt.java index 409ea33..eb45de2 100644 --- a/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExt.java +++ b/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExt.java @@ -119,12 +119,14 @@ public class DefaultMQAdminExt extends ClientConfig implements MQAdminExt { } @Override - public MessageExt viewMessage(String offsetMsgId) throws RemotingException, MQBrokerException, InterruptedException, MQClientException { + public MessageExt viewMessage( + String offsetMsgId) throws RemotingException, MQBrokerException, InterruptedException, MQClientException { return defaultMQAdminExtImpl.viewMessage(offsetMsgId); } @Override - public QueryResult queryMessage(String topic, String key, int maxNum, long begin, long end) throws MQClientException, + public QueryResult queryMessage(String topic, String key, int maxNum, long begin, + long end) throws MQClientException, InterruptedException { return defaultMQAdminExtImpl.queryMessage(topic, key, maxNum, begin, end); } @@ -140,7 +142,8 @@ public class DefaultMQAdminExt extends ClientConfig implements MQAdminExt { } @Override - public void updateBrokerConfig(String brokerAddr, Properties properties) throws RemotingConnectException, RemotingSendRequestException, + public void updateBrokerConfig(String brokerAddr, + Properties properties) throws RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException, UnsupportedEncodingException, InterruptedException, MQBrokerException { defaultMQAdminExtImpl.updateBrokerConfig(brokerAddr, properties); } @@ -158,7 +161,8 @@ public class DefaultMQAdminExt extends ClientConfig implements MQAdminExt { } @Override - public void createAndUpdateSubscriptionGroupConfig(String addr, SubscriptionGroupConfig config) throws RemotingException, + public void createAndUpdateSubscriptionGroupConfig(String addr, + SubscriptionGroupConfig config) throws RemotingException, MQBrokerException, InterruptedException, MQClientException { defaultMQAdminExtImpl.createAndUpdateSubscriptionGroupConfig(addr, config); } @@ -174,7 +178,8 @@ public class DefaultMQAdminExt extends ClientConfig implements MQAdminExt { } @Override - public TopicStatsTable examineTopicStats(String topic) throws RemotingException, MQClientException, InterruptedException, + public TopicStatsTable examineTopicStats( + String topic) throws RemotingException, MQClientException, InterruptedException, MQBrokerException { return defaultMQAdminExtImpl.examineTopicStats(topic); } @@ -185,24 +190,28 @@ public class DefaultMQAdminExt extends ClientConfig implements MQAdminExt { } @Override - public TopicList fetchTopicsByCLuster(String clusterName) throws RemotingException, MQClientException, InterruptedException { + public TopicList fetchTopicsByCLuster( + String clusterName) throws RemotingException, MQClientException, InterruptedException { return this.defaultMQAdminExtImpl.fetchTopicsByCLuster(clusterName); } @Override - public KVTable fetchBrokerRuntimeStats(final String brokerAddr) throws RemotingConnectException, RemotingSendRequestException, + public KVTable fetchBrokerRuntimeStats( + final String brokerAddr) throws RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException, InterruptedException, MQBrokerException { return this.defaultMQAdminExtImpl.fetchBrokerRuntimeStats(brokerAddr); } @Override - public ConsumeStats examineConsumeStats(String consumerGroup) throws RemotingException, MQClientException, InterruptedException, + public ConsumeStats examineConsumeStats( + String consumerGroup) throws RemotingException, MQClientException, InterruptedException, MQBrokerException { return examineConsumeStats(consumerGroup, null); } @Override - public ConsumeStats examineConsumeStats(String consumerGroup, String topic) throws RemotingException, MQClientException, + public ConsumeStats examineConsumeStats(String consumerGroup, + String topic) throws RemotingException, MQClientException, InterruptedException, MQBrokerException { return defaultMQAdminExtImpl.examineConsumeStats(consumerGroup, topic); } @@ -214,18 +223,21 @@ public class DefaultMQAdminExt extends ClientConfig implements MQAdminExt { } @Override - public TopicRouteData examineTopicRouteInfo(String topic) throws RemotingException, MQClientException, InterruptedException { + public TopicRouteData examineTopicRouteInfo( + String topic) throws RemotingException, MQClientException, InterruptedException { return defaultMQAdminExtImpl.examineTopicRouteInfo(topic); } @Override - public ConsumerConnection examineConsumerConnectionInfo(String consumerGroup) throws InterruptedException, MQBrokerException, + public ConsumerConnection examineConsumerConnectionInfo( + String consumerGroup) throws InterruptedException, MQBrokerException, RemotingException, MQClientException { return defaultMQAdminExtImpl.examineConsumerConnectionInfo(consumerGroup); } @Override - public ProducerConnection examineProducerConnectionInfo(String producerGroup, final String topic) throws RemotingException, + public ProducerConnection examineProducerConnectionInfo(String producerGroup, + final String topic) throws RemotingException, MQClientException, InterruptedException, MQBrokerException { return defaultMQAdminExtImpl.examineProducerConnectionInfo(producerGroup, topic); } @@ -247,46 +259,54 @@ public class DefaultMQAdminExt extends ClientConfig implements MQAdminExt { } @Override - public String getKVConfig(String namespace, String key) throws RemotingException, MQClientException, InterruptedException { + public String getKVConfig(String namespace, + String key) throws RemotingException, MQClientException, InterruptedException { return defaultMQAdminExtImpl.getKVConfig(namespace, key); } @Override - public KVTable getKVListByNamespace(String namespace) throws RemotingException, MQClientException, InterruptedException { + public KVTable getKVListByNamespace( + String namespace) throws RemotingException, MQClientException, InterruptedException { return defaultMQAdminExtImpl.getKVListByNamespace(namespace); } @Override - public void deleteTopicInBroker(Set<String> addrs, String topic) throws RemotingException, MQBrokerException, InterruptedException, + public void deleteTopicInBroker(Set<String> addrs, + String topic) throws RemotingException, MQBrokerException, InterruptedException, MQClientException { defaultMQAdminExtImpl.deleteTopicInBroker(addrs, topic); } @Override - public void deleteTopicInNameServer(Set<String> addrs, String topic) throws RemotingException, MQBrokerException, InterruptedException, + public void deleteTopicInNameServer(Set<String> addrs, + String topic) throws RemotingException, MQBrokerException, InterruptedException, MQClientException { defaultMQAdminExtImpl.deleteTopicInNameServer(addrs, topic); } @Override - public void deleteSubscriptionGroup(String addr, String groupName) throws RemotingException, MQBrokerException, InterruptedException, + public void deleteSubscriptionGroup(String addr, + String groupName) throws RemotingException, MQBrokerException, InterruptedException, MQClientException { defaultMQAdminExtImpl.deleteSubscriptionGroup(addr, groupName); } @Override - public void createAndUpdateKvConfig(String namespace, String key, String value) throws RemotingException, MQBrokerException, + public void createAndUpdateKvConfig(String namespace, String key, + String value) throws RemotingException, MQBrokerException, InterruptedException, MQClientException { defaultMQAdminExtImpl.createAndUpdateKvConfig(namespace, key, value); } @Override - public void deleteKvConfig(String namespace, String key) throws RemotingException, MQBrokerException, InterruptedException, + public void deleteKvConfig(String namespace, + String key) throws RemotingException, MQBrokerException, InterruptedException, MQClientException { defaultMQAdminExtImpl.deleteKvConfig(namespace, key); } - public List<RollbackStats> resetOffsetByTimestampOld(String consumerGroup, String topic, long timestamp, boolean force) + public List<RollbackStats> resetOffsetByTimestampOld(String consumerGroup, String topic, long timestamp, + boolean force) throws RemotingException, MQBrokerException, InterruptedException, MQClientException { return defaultMQAdminExtImpl.resetOffsetByTimestampOld(consumerGroup, topic, timestamp, force); } @@ -297,49 +317,57 @@ public class DefaultMQAdminExt extends ClientConfig implements MQAdminExt { return resetOffsetByTimestamp(topic, group, timestamp, isForce, false); } - public Map<MessageQueue, Long> resetOffsetByTimestamp(String topic, String group, long timestamp, boolean isForce, boolean isC) + public Map<MessageQueue, Long> resetOffsetByTimestamp(String topic, String group, long timestamp, boolean isForce, + boolean isC) throws RemotingException, MQBrokerException, InterruptedException, MQClientException { return defaultMQAdminExtImpl.resetOffsetByTimestamp(topic, group, timestamp, isForce, isC); } @Override - public void resetOffsetNew(String consumerGroup, String topic, long timestamp) throws RemotingException, MQBrokerException, + public void resetOffsetNew(String consumerGroup, String topic, + long timestamp) throws RemotingException, MQBrokerException, InterruptedException, MQClientException { this.defaultMQAdminExtImpl.resetOffsetNew(consumerGroup, topic, timestamp); } @Override - public Map<String, Map<MessageQueue, Long>> getConsumeStatus(String topic, String group, String clientAddr) throws RemotingException, + public Map<String, Map<MessageQueue, Long>> getConsumeStatus(String topic, String group, + String clientAddr) throws RemotingException, MQBrokerException, InterruptedException, MQClientException { return defaultMQAdminExtImpl.getConsumeStatus(topic, group, clientAddr); } @Override - public void createOrUpdateOrderConf(String key, String value, boolean isCluster) throws RemotingException, MQBrokerException, + public void createOrUpdateOrderConf(String key, String value, + boolean isCluster) throws RemotingException, MQBrokerException, InterruptedException, MQClientException { defaultMQAdminExtImpl.createOrUpdateOrderConf(key, value, isCluster); } @Override - public GroupList queryTopicConsumeByWho(String topic) throws InterruptedException, MQBrokerException, RemotingException, + public GroupList queryTopicConsumeByWho( + String topic) throws InterruptedException, MQBrokerException, RemotingException, MQClientException { return this.defaultMQAdminExtImpl.queryTopicConsumeByWho(topic); } @Override - public List<QueueTimeSpan> queryConsumeTimeSpan(final String topic, final String group) throws InterruptedException, MQBrokerException, + public List<QueueTimeSpan> queryConsumeTimeSpan(final String topic, + final String group) throws InterruptedException, MQBrokerException, RemotingException, MQClientException { return this.defaultMQAdminExtImpl.queryConsumeTimeSpan(topic, group); } @Override - public boolean cleanExpiredConsumerQueue(String cluster) throws RemotingConnectException, RemotingSendRequestException, + public boolean cleanExpiredConsumerQueue( + String cluster) throws RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException, MQClientException, InterruptedException { return defaultMQAdminExtImpl.cleanExpiredConsumerQueue(cluster); } @Override - public boolean cleanExpiredConsumerQueueByAddr(String addr) throws RemotingConnectException, RemotingSendRequestException, + public boolean cleanExpiredConsumerQueueByAddr( + String addr) throws RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException, MQClientException, InterruptedException { return defaultMQAdminExtImpl.cleanExpiredConsumerQueueByAddr(addr); } @@ -357,7 +385,8 @@ public class DefaultMQAdminExt extends ClientConfig implements MQAdminExt { } @Override - public ConsumerRunningInfo getConsumerRunningInfo(String consumerGroup, String clientId, boolean jstack) throws RemotingException, + public ConsumerRunningInfo getConsumerRunningInfo(String consumerGroup, String clientId, + boolean jstack) throws RemotingException, MQClientException, InterruptedException { return defaultMQAdminExtImpl.getConsumerRunningInfo(consumerGroup, clientId, jstack); } @@ -369,25 +398,29 @@ public class DefaultMQAdminExt extends ClientConfig implements MQAdminExt { } @Override - public ConsumeMessageDirectlyResult consumeMessageDirectly(final String consumerGroup, final String clientId, final String topic, + public ConsumeMessageDirectlyResult consumeMessageDirectly(final String consumerGroup, final String clientId, + final String topic, final String msgId) throws RemotingException, MQClientException, InterruptedException, MQBrokerException { return defaultMQAdminExtImpl.consumeMessageDirectly(consumerGroup, clientId, topic, msgId); } @Override - public List<MessageTrack> messageTrackDetail(MessageExt msg) throws RemotingException, MQClientException, InterruptedException, + public List<MessageTrack> messageTrackDetail( + MessageExt msg) throws RemotingException, MQClientException, InterruptedException, MQBrokerException { return this.defaultMQAdminExtImpl.messageTrackDetail(msg); } @Override - public void cloneGroupOffset(String srcGroup, String destGroup, String topic, boolean isOffline) throws RemotingException, + public void cloneGroupOffset(String srcGroup, String destGroup, String topic, + boolean isOffline) throws RemotingException, MQClientException, InterruptedException, MQBrokerException { this.defaultMQAdminExtImpl.cloneGroupOffset(srcGroup, destGroup, topic, isOffline); } @Override - public BrokerStatsData viewBrokerStatsData(String brokerAddr, String statsName, String statsKey) throws RemotingConnectException, + public BrokerStatsData viewBrokerStatsData(String brokerAddr, String statsName, + String statsKey) throws RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException, MQClientException, InterruptedException { return this.defaultMQAdminExtImpl.viewBrokerStatsData(brokerAddr, statsName, statsKey); } @@ -406,7 +439,8 @@ public class DefaultMQAdminExt extends ClientConfig implements MQAdminExt { } @Override - public Set<String> getTopicClusterList(final String topic) throws InterruptedException, MQBrokerException, MQClientException, RemotingException { + public Set<String> getTopicClusterList( + final String topic) throws InterruptedException, MQBrokerException, MQClientException, RemotingException { return this.defaultMQAdminExtImpl.getTopicClusterList(topic); } @@ -472,7 +506,8 @@ public class DefaultMQAdminExt extends ClientConfig implements MQAdminExt { } @Override - public QueryConsumeQueueResponseBody queryConsumeQueue(String brokerAddr, String topic, int queueId, long index, int count, String consumerGroup) + public QueryConsumeQueueResponseBody queryConsumeQueue(String brokerAddr, String topic, int queueId, long index, + int count, String consumerGroup) throws InterruptedException, RemotingTimeoutException, RemotingSendRequestException, RemotingConnectException, MQClientException { return this.defaultMQAdminExtImpl.queryConsumeQueue( brokerAddr, topic, queueId, index, count, consumerGroup http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/97aa813e/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExtImpl.java ---------------------------------------------------------------------- diff --git a/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExtImpl.java b/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExtImpl.java index 12aea8a..c93c400 100644 --- a/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExtImpl.java +++ b/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExtImpl.java @@ -160,7 +160,8 @@ public class DefaultMQAdminExtImpl implements MQAdminExt, MQAdminExtInner { } @Override - public void updateBrokerConfig(String brokerAddr, Properties properties) throws RemotingConnectException, RemotingSendRequestException, + public void updateBrokerConfig(String brokerAddr, + Properties properties) throws RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException, UnsupportedEncodingException, InterruptedException, MQBrokerException { this.mqClientInstance.getMQClientAPIImpl().updateBrokerConfig(brokerAddr, properties, timeoutMillis); } @@ -178,7 +179,8 @@ public class DefaultMQAdminExtImpl implements MQAdminExt, MQAdminExtInner { } @Override - public void createAndUpdateSubscriptionGroupConfig(String addr, SubscriptionGroupConfig config) throws RemotingException, + public void createAndUpdateSubscriptionGroupConfig(String addr, + SubscriptionGroupConfig config) throws RemotingException, MQBrokerException, InterruptedException, MQClientException { this.mqClientInstance.getMQClientAPIImpl().createSubscriptionGroup(addr, config, timeoutMillis); } @@ -194,7 +196,8 @@ public class DefaultMQAdminExtImpl implements MQAdminExt, MQAdminExtInner { } @Override - public TopicStatsTable examineTopicStats(String topic) throws RemotingException, MQClientException, InterruptedException, + public TopicStatsTable examineTopicStats( + String topic) throws RemotingException, MQClientException, InterruptedException, MQBrokerException { TopicRouteData topicRouteData = this.examineTopicRouteInfo(topic); TopicStatsTable topicStatsTable = new TopicStatsTable(); @@ -220,24 +223,28 @@ public class DefaultMQAdminExtImpl implements MQAdminExt, MQAdminExtInner { } @Override - public TopicList fetchTopicsByCLuster(String clusterName) throws RemotingException, MQClientException, InterruptedException { + public TopicList fetchTopicsByCLuster( + String clusterName) throws RemotingException, MQClientException, InterruptedException { return this.mqClientInstance.getMQClientAPIImpl().getTopicsByCluster(clusterName, timeoutMillis); } @Override - public KVTable fetchBrokerRuntimeStats(final String brokerAddr) throws RemotingConnectException, RemotingSendRequestException, + public KVTable fetchBrokerRuntimeStats( + final String brokerAddr) throws RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException, InterruptedException, MQBrokerException { return this.mqClientInstance.getMQClientAPIImpl().getBrokerRuntimeInfo(brokerAddr, timeoutMillis); } @Override - public ConsumeStats examineConsumeStats(String consumerGroup) throws RemotingException, MQClientException, InterruptedException, + public ConsumeStats examineConsumeStats( + String consumerGroup) throws RemotingException, MQClientException, InterruptedException, MQBrokerException { return examineConsumeStats(consumerGroup, null); } @Override - public ConsumeStats examineConsumeStats(String consumerGroup, String topic) throws RemotingException, MQClientException, + public ConsumeStats examineConsumeStats(String consumerGroup, + String topic) throws RemotingException, MQClientException, InterruptedException, MQBrokerException { String retryTopic = MixAll.getRetryTopic(consumerGroup); TopicRouteData topicRouteData = this.examineTopicRouteInfo(retryTopic); @@ -269,12 +276,14 @@ public class DefaultMQAdminExtImpl implements MQAdminExt, MQAdminExtInner { } @Override - public TopicRouteData examineTopicRouteInfo(String topic) throws RemotingException, MQClientException, InterruptedException { + public TopicRouteData examineTopicRouteInfo( + String topic) throws RemotingException, MQClientException, InterruptedException { return this.mqClientInstance.getMQClientAPIImpl().getTopicRouteInfoFromNameServer(topic, timeoutMillis); } @Override - public MessageExt viewMessage(String topic, String msgId) throws RemotingException, MQBrokerException, InterruptedException, MQClientException { + public MessageExt viewMessage(String topic, + String msgId) throws RemotingException, MQBrokerException, InterruptedException, MQClientException { try { MessageDecoder.decodeMessageId(msgId); return this.viewMessage(msgId); @@ -285,7 +294,8 @@ public class DefaultMQAdminExtImpl implements MQAdminExt, MQAdminExtInner { } @Override - public ConsumerConnection examineConsumerConnectionInfo(String consumerGroup) throws InterruptedException, MQBrokerException, + public ConsumerConnection examineConsumerConnectionInfo( + String consumerGroup) throws InterruptedException, MQBrokerException, RemotingException, MQClientException { ConsumerConnection result = new ConsumerConnection(); String topic = MixAll.getRetryTopic(consumerGroup); @@ -308,7 +318,8 @@ public class DefaultMQAdminExtImpl implements MQAdminExt, MQAdminExtInner { } @Override - public ProducerConnection examineProducerConnectionInfo(String producerGroup, final String topic) throws RemotingException, + public ProducerConnection examineProducerConnectionInfo(String producerGroup, + final String topic) throws RemotingException, MQClientException, InterruptedException, MQBrokerException { ProducerConnection result = new ProducerConnection(); List<BrokerData> brokers = this.examineTopicRouteInfo(topic).getBrokerDatas(); @@ -345,17 +356,20 @@ public class DefaultMQAdminExtImpl implements MQAdminExt, MQAdminExtInner { } @Override - public String getKVConfig(String namespace, String key) throws RemotingException, MQClientException, InterruptedException { + public String getKVConfig(String namespace, + String key) throws RemotingException, MQClientException, InterruptedException { return this.mqClientInstance.getMQClientAPIImpl().getKVConfigValue(namespace, key, timeoutMillis); } @Override - public KVTable getKVListByNamespace(String namespace) throws RemotingException, MQClientException, InterruptedException { + public KVTable getKVListByNamespace( + String namespace) throws RemotingException, MQClientException, InterruptedException { return this.mqClientInstance.getMQClientAPIImpl().getKVListByNamespace(namespace, timeoutMillis); } @Override - public void deleteTopicInBroker(Set<String> addrs, String topic) throws RemotingException, MQBrokerException, InterruptedException, + public void deleteTopicInBroker(Set<String> addrs, + String topic) throws RemotingException, MQBrokerException, InterruptedException, MQClientException { for (String addr : addrs) { this.mqClientInstance.getMQClientAPIImpl().deleteTopicInBroker(addr, topic, timeoutMillis); @@ -363,7 +377,8 @@ public class DefaultMQAdminExtImpl implements MQAdminExt, MQAdminExtInner { } @Override - public void deleteTopicInNameServer(Set<String> addrs, String topic) throws RemotingException, MQBrokerException, InterruptedException, + public void deleteTopicInNameServer(Set<String> addrs, + String topic) throws RemotingException, MQBrokerException, InterruptedException, MQClientException { if (addrs == null) { String ns = this.mqClientInstance.getMQClientAPIImpl().fetchNameServerAddr(); @@ -375,25 +390,29 @@ public class DefaultMQAdminExtImpl implements MQAdminExt, MQAdminExtInner { } @Override - public void deleteSubscriptionGroup(String addr, String groupName) throws RemotingException, MQBrokerException, InterruptedException, + public void deleteSubscriptionGroup(String addr, + String groupName) throws RemotingException, MQBrokerException, InterruptedException, MQClientException { this.mqClientInstance.getMQClientAPIImpl().deleteSubscriptionGroup(addr, groupName, timeoutMillis); } @Override - public void createAndUpdateKvConfig(String namespace, String key, String value) throws RemotingException, MQBrokerException, + public void createAndUpdateKvConfig(String namespace, String key, + String value) throws RemotingException, MQBrokerException, InterruptedException, MQClientException { this.mqClientInstance.getMQClientAPIImpl().putKVConfigValue(namespace, key, value, timeoutMillis); } @Override - public void deleteKvConfig(String namespace, String key) throws RemotingException, MQBrokerException, InterruptedException, + public void deleteKvConfig(String namespace, + String key) throws RemotingException, MQBrokerException, InterruptedException, MQClientException { this.mqClientInstance.getMQClientAPIImpl().deleteKVConfigValue(namespace, key, timeoutMillis); } @Override - public List<RollbackStats> resetOffsetByTimestampOld(String consumerGroup, String topic, long timestamp, boolean force) + public List<RollbackStats> resetOffsetByTimestampOld(String consumerGroup, String topic, long timestamp, + boolean force) throws RemotingException, MQBrokerException, InterruptedException, MQClientException { TopicRouteData topicRouteData = this.examineTopicRouteInfo(topic); List<RollbackStats> rollbackStatsList = new ArrayList<RollbackStats>(); @@ -444,7 +463,8 @@ public class DefaultMQAdminExtImpl implements MQAdminExt, MQAdminExtInner { } @Override - public void resetOffsetNew(String consumerGroup, String topic, long timestamp) throws RemotingException, MQBrokerException, + public void resetOffsetNew(String consumerGroup, String topic, + long timestamp) throws RemotingException, MQBrokerException, InterruptedException, MQClientException { try { this.resetOffsetByTimestamp(topic, consumerGroup, timestamp, true); @@ -457,7 +477,8 @@ public class DefaultMQAdminExtImpl implements MQAdminExt, MQAdminExtInner { } } - public Map<MessageQueue, Long> resetOffsetByTimestamp(String topic, String group, long timestamp, boolean isForce, boolean isC) + public Map<MessageQueue, Long> resetOffsetByTimestamp(String topic, String group, long timestamp, boolean isForce, + boolean isC) throws RemotingException, MQBrokerException, InterruptedException, MQClientException { TopicRouteData topicRouteData = this.examineTopicRouteInfo(topic); List<BrokerData> brokerDatas = topicRouteData.getBrokerDatas(); @@ -478,7 +499,8 @@ public class DefaultMQAdminExtImpl implements MQAdminExt, MQAdminExtInner { return allOffsetTable; } - private RollbackStats resetOffsetConsumeOffset(String brokerAddr, String consumeGroup, MessageQueue queue, OffsetWrapper offsetWrapper, + private RollbackStats resetOffsetConsumeOffset(String brokerAddr, String consumeGroup, MessageQueue queue, + OffsetWrapper offsetWrapper, long timestamp, boolean force) throws RemotingException, InterruptedException, MQBrokerException { long resetOffset; if (timestamp == -1) { @@ -511,7 +533,8 @@ public class DefaultMQAdminExtImpl implements MQAdminExt, MQAdminExtInner { } @Override - public Map<String, Map<MessageQueue, Long>> getConsumeStatus(String topic, String group, String clientAddr) throws RemotingException, + public Map<String, Map<MessageQueue, Long>> getConsumeStatus(String topic, String group, + String clientAddr) throws RemotingException, MQBrokerException, InterruptedException, MQClientException { TopicRouteData topicRouteData = this.examineTopicRouteInfo(topic); List<BrokerData> brokerDatas = topicRouteData.getBrokerDatas(); @@ -525,7 +548,8 @@ public class DefaultMQAdminExtImpl implements MQAdminExt, MQAdminExtInner { return Collections.EMPTY_MAP; } - public void createOrUpdateOrderConf(String key, String value, boolean isCluster) throws RemotingException, MQBrokerException, + public void createOrUpdateOrderConf(String key, String value, + boolean isCluster) throws RemotingException, MQBrokerException, InterruptedException, MQClientException { if (isCluster) { @@ -564,7 +588,8 @@ public class DefaultMQAdminExtImpl implements MQAdminExt, MQAdminExtInner { } @Override - public GroupList queryTopicConsumeByWho(String topic) throws InterruptedException, MQBrokerException, RemotingException, + public GroupList queryTopicConsumeByWho( + String topic) throws InterruptedException, MQBrokerException, RemotingException, MQClientException { TopicRouteData topicRouteData = this.examineTopicRouteInfo(topic); @@ -581,7 +606,8 @@ public class DefaultMQAdminExtImpl implements MQAdminExt, MQAdminExtInner { } @Override - public List<QueueTimeSpan> queryConsumeTimeSpan(final String topic, final String group) throws InterruptedException, MQBrokerException, + public List<QueueTimeSpan> queryConsumeTimeSpan(final String topic, + final String group) throws InterruptedException, MQBrokerException, RemotingException, MQClientException { List<QueueTimeSpan> spanSet = new ArrayList<QueueTimeSpan>(); TopicRouteData topicRouteData = this.examineTopicRouteInfo(topic); @@ -595,7 +621,8 @@ public class DefaultMQAdminExtImpl implements MQAdminExt, MQAdminExtInner { } @Override - public boolean cleanExpiredConsumerQueue(String cluster) throws RemotingConnectException, RemotingSendRequestException, + public boolean cleanExpiredConsumerQueue( + String cluster) throws RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException, MQClientException, InterruptedException { boolean result = false; try { @@ -614,7 +641,8 @@ public class DefaultMQAdminExtImpl implements MQAdminExt, MQAdminExtInner { return result; } - public boolean cleanExpiredConsumerQueueByCluster(ClusterInfo clusterInfo, String cluster) throws RemotingConnectException, + public boolean cleanExpiredConsumerQueueByCluster(ClusterInfo clusterInfo, + String cluster) throws RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException, MQClientException, InterruptedException { boolean result = false; String[] addrs = clusterInfo.retrieveAllAddrByCluster(cluster); @@ -625,7 +653,8 @@ public class DefaultMQAdminExtImpl implements MQAdminExt, MQAdminExtInner { } @Override - public boolean cleanExpiredConsumerQueueByAddr(String addr) throws RemotingConnectException, RemotingSendRequestException, + public boolean cleanExpiredConsumerQueueByAddr( + String addr) throws RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException, MQClientException, InterruptedException { boolean result = mqClientInstance.getMQClientAPIImpl().cleanExpiredConsumeQueue(addr, timeoutMillis); log.warn("clean expired ConsumeQueue on target " + addr + " broker " + result); @@ -671,7 +700,8 @@ public class DefaultMQAdminExtImpl implements MQAdminExt, MQAdminExtInner { } @Override - public ConsumerRunningInfo getConsumerRunningInfo(String consumerGroup, String clientId, boolean jstack) throws RemotingException, + public ConsumerRunningInfo getConsumerRunningInfo(String consumerGroup, String clientId, + boolean jstack) throws RemotingException, MQClientException, InterruptedException { String topic = MixAll.RETRY_GROUP_TOPIC_PREFIX + consumerGroup; TopicRouteData topicRouteData = this.examineTopicRouteInfo(topic); @@ -698,7 +728,8 @@ public class DefaultMQAdminExtImpl implements MQAdminExt, MQAdminExtInner { } @Override - public ConsumeMessageDirectlyResult consumeMessageDirectly(final String consumerGroup, final String clientId, final String topic, + public ConsumeMessageDirectlyResult consumeMessageDirectly(final String consumerGroup, final String clientId, + final String topic, final String msgId) throws RemotingException, MQClientException, InterruptedException, MQBrokerException { MessageExt msg = this.viewMessage(topic, msgId); if (msg.getProperty(MessageConst.PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX) == null) { @@ -712,7 +743,8 @@ public class DefaultMQAdminExtImpl implements MQAdminExt, MQAdminExtInner { } @Override - public List<MessageTrack> messageTrackDetail(MessageExt msg) throws RemotingException, MQClientException, InterruptedException, + public List<MessageTrack> messageTrackDetail( + MessageExt msg) throws RemotingException, MQClientException, InterruptedException, MQBrokerException { List<MessageTrack> result = new ArrayList<MessageTrack>(); @@ -794,7 +826,8 @@ public class DefaultMQAdminExtImpl implements MQAdminExt, MQAdminExtInner { return result; } - public boolean consumed(final MessageExt msg, final String group) throws RemotingException, MQClientException, InterruptedException, + public boolean consumed(final MessageExt msg, + final String group) throws RemotingException, MQClientException, InterruptedException, MQBrokerException { ConsumeStats cstats = this.examineConsumeStats(group); @@ -822,7 +855,8 @@ public class DefaultMQAdminExtImpl implements MQAdminExt, MQAdminExtInner { } @Override - public void cloneGroupOffset(String srcGroup, String destGroup, String topic, boolean isOffline) throws RemotingException, + public void cloneGroupOffset(String srcGroup, String destGroup, String topic, + boolean isOffline) throws RemotingException, MQClientException, InterruptedException, MQBrokerException { String retryTopic = MixAll.getRetryTopic(srcGroup); TopicRouteData topicRouteData = this.examineTopicRouteInfo(retryTopic); @@ -836,7 +870,8 @@ public class DefaultMQAdminExtImpl implements MQAdminExt, MQAdminExtInner { } @Override - public BrokerStatsData viewBrokerStatsData(String brokerAddr, String statsName, String statsKey) throws RemotingConnectException, + public BrokerStatsData viewBrokerStatsData(String brokerAddr, String statsName, + String statsKey) throws RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException, MQClientException, InterruptedException { return this.mqClientInstance.getMQClientAPIImpl().viewBrokerStatsData(brokerAddr, statsName, statsKey, timeoutMillis); } @@ -855,7 +890,8 @@ public class DefaultMQAdminExtImpl implements MQAdminExt, MQAdminExtInner { } @Override - public Set<String> getTopicClusterList(final String topic) throws InterruptedException, MQBrokerException, MQClientException, + public Set<String> getTopicClusterList( + final String topic) throws InterruptedException, MQBrokerException, MQClientException, RemotingException { Set<String> clusterSet = new HashSet<String>(); ClusterInfo clusterInfo = examineBrokerClusterInfo(); @@ -873,13 +909,15 @@ public class DefaultMQAdminExtImpl implements MQAdminExt, MQAdminExtInner { } @Override - public SubscriptionGroupWrapper getAllSubscriptionGroup(final String brokerAddr, long timeoutMillis) throws InterruptedException, + public SubscriptionGroupWrapper getAllSubscriptionGroup(final String brokerAddr, + long timeoutMillis) throws InterruptedException, RemotingTimeoutException, RemotingSendRequestException, RemotingConnectException, MQBrokerException { return this.mqClientInstance.getMQClientAPIImpl().getAllSubscriptionGroup(brokerAddr, timeoutMillis); } @Override - public TopicConfigSerializeWrapper getAllTopicGroup(final String brokerAddr, long timeoutMillis) throws InterruptedException, + public TopicConfigSerializeWrapper getAllTopicGroup(final String brokerAddr, + long timeoutMillis) throws InterruptedException, RemotingTimeoutException, RemotingSendRequestException, RemotingConnectException, MQBrokerException { return this.mqClientInstance.getMQClientAPIImpl().getAllTopicConfig(brokerAddr, timeoutMillis); } @@ -915,12 +953,14 @@ public class DefaultMQAdminExtImpl implements MQAdminExt, MQAdminExtInner { } @Override - public MessageExt viewMessage(String msgId) throws RemotingException, MQBrokerException, InterruptedException, MQClientException { + public MessageExt viewMessage( + String msgId) throws RemotingException, MQBrokerException, InterruptedException, MQClientException { return this.mqClientInstance.getMQAdminImpl().viewMessage(msgId); } @Override - public QueryResult queryMessage(String topic, String key, int maxNum, long begin, long end) throws MQClientException, + public QueryResult queryMessage(String topic, String key, int maxNum, long begin, + long end) throws MQClientException, InterruptedException { return this.mqClientInstance.getMQAdminImpl().queryMessage(topic, key, maxNum, begin, end); } @@ -953,7 +993,8 @@ public class DefaultMQAdminExtImpl implements MQAdminExt, MQAdminExtInner { } @Override - public QueryConsumeQueueResponseBody queryConsumeQueue(String brokerAddr, String topic, int queueId, long index, int count, String consumerGroup) + public QueryConsumeQueueResponseBody queryConsumeQueue(String brokerAddr, String topic, int queueId, long index, + int count, String consumerGroup) throws InterruptedException, RemotingTimeoutException, RemotingSendRequestException, RemotingConnectException, MQClientException { return this.mqClientInstance.getMQClientAPIImpl().queryConsumeQueue( brokerAddr, topic, queueId, index, count, consumerGroup, timeoutMillis http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/97aa813e/tools/src/main/java/org/apache/rocketmq/tools/admin/MQAdminExt.java ---------------------------------------------------------------------- diff --git a/tools/src/main/java/org/apache/rocketmq/tools/admin/MQAdminExt.java b/tools/src/main/java/org/apache/rocketmq/tools/admin/MQAdminExt.java index 82add92..16b4427 100644 --- a/tools/src/main/java/org/apache/rocketmq/tools/admin/MQAdminExt.java +++ b/tools/src/main/java/org/apache/rocketmq/tools/admin/MQAdminExt.java @@ -64,42 +64,51 @@ public interface MQAdminExt extends MQAdmin { Properties getBrokerConfig(final String brokerAddr) throws RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException, UnsupportedEncodingException, InterruptedException, MQBrokerException; - void createAndUpdateTopicConfig(final String addr, final TopicConfig config) throws RemotingException, MQBrokerException, + void createAndUpdateTopicConfig(final String addr, + final TopicConfig config) throws RemotingException, MQBrokerException, InterruptedException, MQClientException; - void createAndUpdateSubscriptionGroupConfig(final String addr, final SubscriptionGroupConfig config) throws RemotingException, + void createAndUpdateSubscriptionGroupConfig(final String addr, + final SubscriptionGroupConfig config) throws RemotingException, MQBrokerException, InterruptedException, MQClientException; SubscriptionGroupConfig examineSubscriptionGroupConfig(final String addr, final String group); TopicConfig examineTopicConfig(final String addr, final String topic); - TopicStatsTable examineTopicStats(final String topic) throws RemotingException, MQClientException, InterruptedException, + TopicStatsTable examineTopicStats( + final String topic) throws RemotingException, MQClientException, InterruptedException, MQBrokerException; TopicList fetchAllTopicList() throws RemotingException, MQClientException, InterruptedException; - TopicList fetchTopicsByCLuster(String clusterName) throws RemotingException, MQClientException, InterruptedException; + TopicList fetchTopicsByCLuster( + String clusterName) throws RemotingException, MQClientException, InterruptedException; - KVTable fetchBrokerRuntimeStats(final String brokerAddr) throws RemotingConnectException, RemotingSendRequestException, + KVTable fetchBrokerRuntimeStats( + final String brokerAddr) throws RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException, InterruptedException, MQBrokerException; - ConsumeStats examineConsumeStats(final String consumerGroup) throws RemotingException, MQClientException, InterruptedException, + ConsumeStats examineConsumeStats( + final String consumerGroup) throws RemotingException, MQClientException, InterruptedException, MQBrokerException; - ConsumeStats examineConsumeStats(final String consumerGroup, final String topic) throws RemotingException, MQClientException, + ConsumeStats examineConsumeStats(final String consumerGroup, + final String topic) throws RemotingException, MQClientException, InterruptedException, MQBrokerException; ClusterInfo examineBrokerClusterInfo() throws InterruptedException, MQBrokerException, RemotingTimeoutException, RemotingSendRequestException, RemotingConnectException; - TopicRouteData examineTopicRouteInfo(final String topic) throws RemotingException, MQClientException, InterruptedException; + TopicRouteData examineTopicRouteInfo( + final String topic) throws RemotingException, MQClientException, InterruptedException; ConsumerConnection examineConsumerConnectionInfo(final String consumerGroup) throws RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException, InterruptedException, MQBrokerException, RemotingException, MQClientException; - ProducerConnection examineProducerConnectionInfo(final String producerGroup, final String topic) throws RemotingException, + ProducerConnection examineProducerConnectionInfo(final String producerGroup, + final String topic) throws RemotingException, MQClientException, InterruptedException, MQBrokerException; List<String> getNameServerAddressList(); @@ -109,20 +118,24 @@ public interface MQAdminExt extends MQAdmin { void putKVConfig(final String namespace, final String key, final String value); - String getKVConfig(final String namespace, final String key) throws RemotingException, MQClientException, InterruptedException; + String getKVConfig(final String namespace, + final String key) throws RemotingException, MQClientException, InterruptedException; - KVTable getKVListByNamespace(final String namespace) throws RemotingException, MQClientException, InterruptedException; + KVTable getKVListByNamespace( + final String namespace) throws RemotingException, MQClientException, InterruptedException; void deleteTopicInBroker(final Set<String> addrs, final String topic) throws RemotingException, MQBrokerException, InterruptedException, MQClientException; - void deleteTopicInNameServer(final Set<String> addrs, final String topic) throws RemotingException, MQBrokerException, + void deleteTopicInNameServer(final Set<String> addrs, + final String topic) throws RemotingException, MQBrokerException, InterruptedException, MQClientException; void deleteSubscriptionGroup(final String addr, String groupName) throws RemotingException, MQBrokerException, InterruptedException, MQClientException; - void createAndUpdateKvConfig(String namespace, String key, String value) throws RemotingException, MQBrokerException, + void createAndUpdateKvConfig(String namespace, String key, + String value) throws RemotingException, MQBrokerException, InterruptedException, MQClientException; void deleteKvConfig(String namespace, String key) throws RemotingException, MQBrokerException, InterruptedException, @@ -137,16 +150,19 @@ public interface MQAdminExt extends MQAdmin { void resetOffsetNew(String consumerGroup, String topic, long timestamp) throws RemotingException, MQBrokerException, InterruptedException, MQClientException; - Map<String, Map<MessageQueue, Long>> getConsumeStatus(String topic, String group, String clientAddr) throws RemotingException, + Map<String, Map<MessageQueue, Long>> getConsumeStatus(String topic, String group, + String clientAddr) throws RemotingException, MQBrokerException, InterruptedException, MQClientException; - void createOrUpdateOrderConf(String key, String value, boolean isCluster) throws RemotingException, MQBrokerException, + void createOrUpdateOrderConf(String key, String value, + boolean isCluster) throws RemotingException, MQBrokerException, InterruptedException, MQClientException; GroupList queryTopicConsumeByWho(final String topic) throws RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException, InterruptedException, MQBrokerException, RemotingException, MQClientException; - List<QueueTimeSpan> queryConsumeTimeSpan(final String topic, final String group) throws InterruptedException, MQBrokerException, + List<QueueTimeSpan> queryConsumeTimeSpan(final String topic, + final String group) throws InterruptedException, MQBrokerException, RemotingException, MQClientException; boolean cleanExpiredConsumerQueue(String cluster) throws RemotingConnectException, RemotingSendRequestException, @@ -173,7 +189,8 @@ public interface MQAdminExt extends MQAdmin { String topic, String msgId) throws RemotingException, MQClientException, InterruptedException, MQBrokerException; - List<MessageTrack> messageTrackDetail(MessageExt msg) throws RemotingException, MQClientException, InterruptedException, + List<MessageTrack> messageTrackDetail( + MessageExt msg) throws RemotingException, MQClientException, InterruptedException, MQBrokerException; void cloneGroupOffset(String srcGroup, String destGroup, String topic, boolean isOffline) throws RemotingException, @@ -190,7 +207,8 @@ public interface MQAdminExt extends MQAdmin { long timeoutMillis) throws RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException, MQClientException, InterruptedException; - Set<String> getTopicClusterList(final String topic) throws InterruptedException, MQBrokerException, MQClientException, RemotingException; + Set<String> getTopicClusterList( + final String topic) throws InterruptedException, MQBrokerException, MQClientException, RemotingException; SubscriptionGroupWrapper getAllSubscriptionGroup(final String brokerAddr, long timeoutMillis) throws InterruptedException, RemotingTimeoutException, RemotingSendRequestException, @@ -209,18 +227,9 @@ public interface MQAdminExt extends MQAdmin { * Command Code : RequestCode.UPDATE_NAMESRV_CONFIG * * <br> If param(nameServers) is null or empty, will use name servers from ns! - * - * @param properties - * @param nameServers - * @throws InterruptedException - * @throws RemotingConnectException - * @throws UnsupportedEncodingException - * @throws RemotingSendRequestException - * @throws RemotingTimeoutException - * @throws MQClientException - * @throws MQBrokerException */ - void updateNameServerConfig(final Properties properties, final List<String> nameServers) throws InterruptedException, RemotingConnectException, + void updateNameServerConfig(final Properties properties, + final List<String> nameServers) throws InterruptedException, RemotingConnectException, UnsupportedEncodingException, RemotingSendRequestException, RemotingTimeoutException, MQClientException, MQBrokerException; @@ -230,14 +239,7 @@ public interface MQAdminExt extends MQAdmin { * Command Code : RequestCode.GET_NAMESRV_CONFIG * <br> If param(nameServers) is null or empty, will use name servers from ns! * - * @param nameServers * @return The fetched name server config - * @throws InterruptedException - * @throws RemotingTimeoutException - * @throws RemotingSendRequestException - * @throws RemotingConnectException - * @throws MQClientException - * @throws UnsupportedEncodingException */ Map<String, Properties> getNameServerConfig(final List<String> nameServers) throws InterruptedException, RemotingTimeoutException, RemotingSendRequestException, RemotingConnectException, @@ -252,15 +254,9 @@ public interface MQAdminExt extends MQAdmin { * @param index start offset * @param count how many * @param consumerGroup group - * @return - * @throws InterruptedException - * @throws RemotingTimeoutException - * @throws RemotingSendRequestException - * @throws RemotingConnectException - * @throws MQClientException */ QueryConsumeQueueResponseBody queryConsumeQueue(final String brokerAddr, - final String topic, final int queueId, - final long index, final int count, final String consumerGroup) + final String topic, final int queueId, + final long index, final int count, final String consumerGroup) throws InterruptedException, RemotingTimeoutException, RemotingSendRequestException, RemotingConnectException, MQClientException; } http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/97aa813e/tools/src/main/java/org/apache/rocketmq/tools/command/broker/GetBrokerConfigCommand.java ---------------------------------------------------------------------- diff --git a/tools/src/main/java/org/apache/rocketmq/tools/command/broker/GetBrokerConfigCommand.java b/tools/src/main/java/org/apache/rocketmq/tools/command/broker/GetBrokerConfigCommand.java index 919f673..11a3604 100644 --- a/tools/src/main/java/org/apache/rocketmq/tools/command/broker/GetBrokerConfigCommand.java +++ b/tools/src/main/java/org/apache/rocketmq/tools/command/broker/GetBrokerConfigCommand.java @@ -60,7 +60,8 @@ public class GetBrokerConfigCommand implements SubCommand { } @Override - public void execute(final CommandLine commandLine, final Options options, final RPCHook rpcHook) throws SubCommandException { + public void execute(final CommandLine commandLine, final Options options, + final RPCHook rpcHook) throws SubCommandException { DefaultMQAdminExt defaultMQAdminExt = new DefaultMQAdminExt(rpcHook); defaultMQAdminExt.setInstanceName(Long.toString(System.currentTimeMillis())); http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/97aa813e/tools/src/main/java/org/apache/rocketmq/tools/command/cluster/ClusterListSubCommand.java ---------------------------------------------------------------------- diff --git a/tools/src/main/java/org/apache/rocketmq/tools/command/cluster/ClusterListSubCommand.java b/tools/src/main/java/org/apache/rocketmq/tools/command/cluster/ClusterListSubCommand.java index c7b8ac5..6a0cd71 100644 --- a/tools/src/main/java/org/apache/rocketmq/tools/command/cluster/ClusterListSubCommand.java +++ b/tools/src/main/java/org/apache/rocketmq/tools/command/cluster/ClusterListSubCommand.java @@ -61,7 +61,8 @@ public class ClusterListSubCommand implements SubCommand { } @Override - public void execute(final CommandLine commandLine, final Options options, RPCHook rpcHook) throws SubCommandException { + public void execute(final CommandLine commandLine, final Options options, + RPCHook rpcHook) throws SubCommandException { DefaultMQAdminExt defaultMQAdminExt = new DefaultMQAdminExt(rpcHook); defaultMQAdminExt.setInstanceName(Long.toString(System.currentTimeMillis())); @@ -164,7 +165,8 @@ public class ClusterListSubCommand implements SubCommand { } } - private void printClusterBaseInfo(final DefaultMQAdminExt defaultMQAdminExt) throws RemotingConnectException, RemotingTimeoutException, + private void printClusterBaseInfo( + final DefaultMQAdminExt defaultMQAdminExt) throws RemotingConnectException, RemotingTimeoutException, RemotingSendRequestException, InterruptedException, MQBrokerException { ClusterInfo clusterInfoSerializeWrapper = defaultMQAdminExt.examineBrokerClusterInfo(); @@ -253,16 +255,16 @@ public class ClusterListSubCommand implements SubCommand { } System.out.printf("%-16s %-22s %-4s %-22s %-16s %19s %19s %10s %5s %6s%n", - clusterName, - brokerName, - next1.getKey(), - next1.getValue(), - version, - String.format("%9.2f(%s,%sms)", in, sendThreadPoolQueueSize, sendThreadPoolQueueHeadWaitTimeMills), - String.format("%9.2f(%s,%sms)", out, pullThreadPoolQueueSize, pullThreadPoolQueueHeadWaitTimeMills), - pageCacheLockTimeMills, - String.format("%2.2f", hour), - String.format("%.4f", space) + clusterName, + brokerName, + next1.getKey(), + next1.getValue(), + version, + String.format("%9.2f(%s,%sms)", in, sendThreadPoolQueueSize, sendThreadPoolQueueHeadWaitTimeMills), + String.format("%9.2f(%s,%sms)", out, pullThreadPoolQueueSize, pullThreadPoolQueueHeadWaitTimeMills), + pageCacheLockTimeMills, + String.format("%2.2f", hour), + String.format("%.4f", space) ); } } http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/97aa813e/tools/src/main/java/org/apache/rocketmq/tools/command/consumer/UpdateSubGroupSubCommand.java ---------------------------------------------------------------------- diff --git a/tools/src/main/java/org/apache/rocketmq/tools/command/consumer/UpdateSubGroupSubCommand.java b/tools/src/main/java/org/apache/rocketmq/tools/command/consumer/UpdateSubGroupSubCommand.java index 910eb1c..7316526 100644 --- a/tools/src/main/java/org/apache/rocketmq/tools/command/consumer/UpdateSubGroupSubCommand.java +++ b/tools/src/main/java/org/apache/rocketmq/tools/command/consumer/UpdateSubGroupSubCommand.java @@ -90,7 +90,8 @@ public class UpdateSubGroupSubCommand implements SubCommand { } @Override - public void execute(final CommandLine commandLine, final Options options, RPCHook rpcHook) throws SubCommandException { + public void execute(final CommandLine commandLine, final Options options, + RPCHook rpcHook) throws SubCommandException { DefaultMQAdminExt defaultMQAdminExt = new DefaultMQAdminExt(rpcHook); defaultMQAdminExt.setInstanceName(Long.toString(System.currentTimeMillis())); http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/97aa813e/tools/src/main/java/org/apache/rocketmq/tools/command/message/DecodeMessageIdCommond.java ---------------------------------------------------------------------- diff --git a/tools/src/main/java/org/apache/rocketmq/tools/command/message/DecodeMessageIdCommond.java b/tools/src/main/java/org/apache/rocketmq/tools/command/message/DecodeMessageIdCommond.java index 64f634e..d233b65 100644 --- a/tools/src/main/java/org/apache/rocketmq/tools/command/message/DecodeMessageIdCommond.java +++ b/tools/src/main/java/org/apache/rocketmq/tools/command/message/DecodeMessageIdCommond.java @@ -45,7 +45,8 @@ public class DecodeMessageIdCommond implements SubCommand { } @Override - public void execute(final CommandLine commandLine, final Options options, RPCHook rpcHook) throws SubCommandException { + public void execute(final CommandLine commandLine, final Options options, + RPCHook rpcHook) throws SubCommandException { String messageId = commandLine.getOptionValue('i').trim(); try { http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/97aa813e/tools/src/main/java/org/apache/rocketmq/tools/command/message/PrintMessageByQueueCommand.java ---------------------------------------------------------------------- diff --git a/tools/src/main/java/org/apache/rocketmq/tools/command/message/PrintMessageByQueueCommand.java b/tools/src/main/java/org/apache/rocketmq/tools/command/message/PrintMessageByQueueCommand.java index ac51267..46c5f74 100644 --- a/tools/src/main/java/org/apache/rocketmq/tools/command/message/PrintMessageByQueueCommand.java +++ b/tools/src/main/java/org/apache/rocketmq/tools/command/message/PrintMessageByQueueCommand.java @@ -52,7 +52,8 @@ public class PrintMessageByQueueCommand implements SubCommand { return timestamp; } - private static void calculateByTag(final List<MessageExt> msgs, final Map<String, AtomicLong> tagCalmap, final boolean calByTag) { + private static void calculateByTag(final List<MessageExt> msgs, final Map<String, AtomicLong> tagCalmap, + final boolean calByTag) { if (!calByTag) return; @@ -85,7 +86,8 @@ public class PrintMessageByQueueCommand implements SubCommand { } } - public static void printMessage(final List<MessageExt> msgs, final String charsetName, boolean printMsg, boolean printBody) { + public static void printMessage(final List<MessageExt> msgs, final String charsetName, boolean printMsg, + boolean printBody) { if (!printMsg) return; @@ -162,11 +164,11 @@ public class PrintMessageByQueueCommand implements SubCommand { String charsetName = !commandLine.hasOption('c') ? "UTF-8" : commandLine.getOptionValue('c').trim(); boolean printMsg = - commandLine.hasOption('p') && Boolean.parseBoolean(commandLine.getOptionValue('p').trim()); + commandLine.hasOption('p') && Boolean.parseBoolean(commandLine.getOptionValue('p').trim()); boolean printBody = - commandLine.hasOption('d') && Boolean.parseBoolean(commandLine.getOptionValue('d').trim()); + commandLine.hasOption('d') && Boolean.parseBoolean(commandLine.getOptionValue('d').trim()); boolean calByTag = - commandLine.hasOption('f') && Boolean.parseBoolean(commandLine.getOptionValue('f').trim()); + commandLine.hasOption('f') && Boolean.parseBoolean(commandLine.getOptionValue('f').trim()); String subExpression = !commandLine.hasOption('s') ? "*" : commandLine.getOptionValue('s').trim(); http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/97aa813e/tools/src/main/java/org/apache/rocketmq/tools/command/message/QueryMsgByIdSubCommand.java ---------------------------------------------------------------------- diff --git a/tools/src/main/java/org/apache/rocketmq/tools/command/message/QueryMsgByIdSubCommand.java b/tools/src/main/java/org/apache/rocketmq/tools/command/message/QueryMsgByIdSubCommand.java index 05ae003..39abbc9 100644 --- a/tools/src/main/java/org/apache/rocketmq/tools/command/message/QueryMsgByIdSubCommand.java +++ b/tools/src/main/java/org/apache/rocketmq/tools/command/message/QueryMsgByIdSubCommand.java @@ -262,7 +262,8 @@ public class QueryMsgByIdSubCommand implements SubCommand { } } - private void pushMsg(final DefaultMQAdminExt defaultMQAdminExt, final String consumerGroup, final String clientId, final String msgId) { + private void pushMsg(final DefaultMQAdminExt defaultMQAdminExt, final String consumerGroup, final String clientId, + final String msgId) { try { ConsumeMessageDirectlyResult result = defaultMQAdminExt.consumeMessageDirectly(consumerGroup, clientId, msgId); http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/97aa813e/tools/src/main/java/org/apache/rocketmq/tools/command/message/QueryMsgByUniqueKeySubCommand.java ---------------------------------------------------------------------- diff --git a/tools/src/main/java/org/apache/rocketmq/tools/command/message/QueryMsgByUniqueKeySubCommand.java b/tools/src/main/java/org/apache/rocketmq/tools/command/message/QueryMsgByUniqueKeySubCommand.java index 5c93ad7..0103b50 100644 --- a/tools/src/main/java/org/apache/rocketmq/tools/command/message/QueryMsgByUniqueKeySubCommand.java +++ b/tools/src/main/java/org/apache/rocketmq/tools/command/message/QueryMsgByUniqueKeySubCommand.java @@ -39,7 +39,8 @@ import org.apache.rocketmq.tools.command.SubCommandException; public class QueryMsgByUniqueKeySubCommand implements SubCommand { - public static void queryById(final DefaultMQAdminExt admin, final String topic, final String msgId) throws MQClientException, + public static void queryById(final DefaultMQAdminExt admin, final String topic, + final String msgId) throws MQClientException, RemotingException, MQBrokerException, InterruptedException, IOException { MessageExt msg = admin.viewMessage(topic, msgId); http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/97aa813e/tools/src/main/java/org/apache/rocketmq/tools/command/namesrv/GetNamesrvConfigCommand.java ---------------------------------------------------------------------- diff --git a/tools/src/main/java/org/apache/rocketmq/tools/command/namesrv/GetNamesrvConfigCommand.java b/tools/src/main/java/org/apache/rocketmq/tools/command/namesrv/GetNamesrvConfigCommand.java index ce63616..22ce867 100644 --- a/tools/src/main/java/org/apache/rocketmq/tools/command/namesrv/GetNamesrvConfigCommand.java +++ b/tools/src/main/java/org/apache/rocketmq/tools/command/namesrv/GetNamesrvConfigCommand.java @@ -46,7 +46,8 @@ public class GetNamesrvConfigCommand implements SubCommand { } @Override - public void execute(final CommandLine commandLine, final Options options, final RPCHook rpcHook) throws SubCommandException { + public void execute(final CommandLine commandLine, final Options options, + final RPCHook rpcHook) throws SubCommandException { DefaultMQAdminExt defaultMQAdminExt = new DefaultMQAdminExt(rpcHook); defaultMQAdminExt.setInstanceName(Long.toString(System.currentTimeMillis())); try {