http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/388ba7a5/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java ---------------------------------------------------------------------- diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java b/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java index 84ee7db..2dd9200 100644 --- a/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java +++ b/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java @@ -16,6 +16,17 @@ */ package org.apache.rocketmq.client.impl; +import java.io.UnsupportedEncodingException; +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Properties; +import java.util.Set; +import java.util.concurrent.atomic.AtomicInteger; import org.apache.rocketmq.client.ClientConfig; import org.apache.rocketmq.client.consumer.PullCallback; import org.apache.rocketmq.client.consumer.PullResult; @@ -37,21 +48,97 @@ import org.apache.rocketmq.common.TopicConfig; import org.apache.rocketmq.common.UtilAll; import org.apache.rocketmq.common.admin.ConsumeStats; import org.apache.rocketmq.common.admin.TopicStatsTable; -import org.apache.rocketmq.common.message.*; +import org.apache.rocketmq.common.message.Message; +import org.apache.rocketmq.common.message.MessageClientIDSetter; +import org.apache.rocketmq.common.message.MessageConst; +import org.apache.rocketmq.common.message.MessageDecoder; +import org.apache.rocketmq.common.message.MessageExt; +import org.apache.rocketmq.common.message.MessageQueue; import org.apache.rocketmq.common.namesrv.TopAddressing; import org.apache.rocketmq.common.protocol.RequestCode; import org.apache.rocketmq.common.protocol.ResponseCode; -import org.apache.rocketmq.common.protocol.body.*; -import org.apache.rocketmq.common.protocol.header.*; +import org.apache.rocketmq.common.protocol.body.BrokerStatsData; +import org.apache.rocketmq.common.protocol.body.ClusterInfo; +import org.apache.rocketmq.common.protocol.body.ConsumeMessageDirectlyResult; +import org.apache.rocketmq.common.protocol.body.ConsumeStatsList; +import org.apache.rocketmq.common.protocol.body.ConsumerConnection; +import org.apache.rocketmq.common.protocol.body.ConsumerRunningInfo; +import org.apache.rocketmq.common.protocol.body.GetConsumerStatusBody; +import org.apache.rocketmq.common.protocol.body.GroupList; +import org.apache.rocketmq.common.protocol.body.KVTable; +import org.apache.rocketmq.common.protocol.body.LockBatchRequestBody; +import org.apache.rocketmq.common.protocol.body.LockBatchResponseBody; +import org.apache.rocketmq.common.protocol.body.ProducerConnection; +import org.apache.rocketmq.common.protocol.body.QueryConsumeTimeSpanBody; +import org.apache.rocketmq.common.protocol.body.QueryCorrectionOffsetBody; +import org.apache.rocketmq.common.protocol.body.QueueTimeSpan; +import org.apache.rocketmq.common.protocol.body.ResetOffsetBody; +import org.apache.rocketmq.common.protocol.body.SubscriptionGroupWrapper; +import org.apache.rocketmq.common.protocol.body.TopicConfigSerializeWrapper; +import org.apache.rocketmq.common.protocol.body.TopicList; +import org.apache.rocketmq.common.protocol.body.UnlockBatchRequestBody; +import org.apache.rocketmq.common.protocol.header.CloneGroupOffsetRequestHeader; +import org.apache.rocketmq.common.protocol.header.ConsumeMessageDirectlyResultRequestHeader; +import org.apache.rocketmq.common.protocol.header.ConsumerSendMsgBackRequestHeader; +import org.apache.rocketmq.common.protocol.header.CreateTopicRequestHeader; +import org.apache.rocketmq.common.protocol.header.DeleteSubscriptionGroupRequestHeader; +import org.apache.rocketmq.common.protocol.header.DeleteTopicRequestHeader; +import org.apache.rocketmq.common.protocol.header.EndTransactionRequestHeader; +import org.apache.rocketmq.common.protocol.header.GetConsumeStatsInBrokerHeader; +import org.apache.rocketmq.common.protocol.header.GetConsumeStatsRequestHeader; +import org.apache.rocketmq.common.protocol.header.GetConsumerConnectionListRequestHeader; +import org.apache.rocketmq.common.protocol.header.GetConsumerListByGroupRequestHeader; +import org.apache.rocketmq.common.protocol.header.GetConsumerListByGroupResponseBody; +import org.apache.rocketmq.common.protocol.header.GetConsumerRunningInfoRequestHeader; +import org.apache.rocketmq.common.protocol.header.GetConsumerStatusRequestHeader; +import org.apache.rocketmq.common.protocol.header.GetEarliestMsgStoretimeRequestHeader; +import org.apache.rocketmq.common.protocol.header.GetEarliestMsgStoretimeResponseHeader; +import org.apache.rocketmq.common.protocol.header.GetMaxOffsetRequestHeader; +import org.apache.rocketmq.common.protocol.header.GetMaxOffsetResponseHeader; +import org.apache.rocketmq.common.protocol.header.GetMinOffsetRequestHeader; +import org.apache.rocketmq.common.protocol.header.GetMinOffsetResponseHeader; +import org.apache.rocketmq.common.protocol.header.GetProducerConnectionListRequestHeader; +import org.apache.rocketmq.common.protocol.header.GetTopicStatsInfoRequestHeader; +import org.apache.rocketmq.common.protocol.header.GetTopicsByClusterRequestHeader; +import org.apache.rocketmq.common.protocol.header.PullMessageRequestHeader; +import org.apache.rocketmq.common.protocol.header.PullMessageResponseHeader; +import org.apache.rocketmq.common.protocol.header.QueryConsumeTimeSpanRequestHeader; +import org.apache.rocketmq.common.protocol.header.QueryConsumerOffsetRequestHeader; +import org.apache.rocketmq.common.protocol.header.QueryConsumerOffsetResponseHeader; +import org.apache.rocketmq.common.protocol.header.QueryCorrectionOffsetHeader; +import org.apache.rocketmq.common.protocol.header.QueryMessageRequestHeader; +import org.apache.rocketmq.common.protocol.header.QueryTopicConsumeByWhoRequestHeader; +import org.apache.rocketmq.common.protocol.header.ResetOffsetRequestHeader; +import org.apache.rocketmq.common.protocol.header.SearchOffsetRequestHeader; +import org.apache.rocketmq.common.protocol.header.SearchOffsetResponseHeader; +import org.apache.rocketmq.common.protocol.header.SendMessageRequestHeader; +import org.apache.rocketmq.common.protocol.header.SendMessageRequestHeaderV2; +import org.apache.rocketmq.common.protocol.header.SendMessageResponseHeader; +import org.apache.rocketmq.common.protocol.header.UnregisterClientRequestHeader; +import org.apache.rocketmq.common.protocol.header.UpdateConsumerOffsetRequestHeader; +import org.apache.rocketmq.common.protocol.header.ViewBrokerStatsDataRequestHeader; +import org.apache.rocketmq.common.protocol.header.ViewMessageRequestHeader; import org.apache.rocketmq.common.protocol.header.filtersrv.RegisterMessageFilterClassRequestHeader; -import org.apache.rocketmq.common.protocol.header.namesrv.*; +import org.apache.rocketmq.common.protocol.header.namesrv.DeleteKVConfigRequestHeader; +import org.apache.rocketmq.common.protocol.header.namesrv.GetKVConfigRequestHeader; +import org.apache.rocketmq.common.protocol.header.namesrv.GetKVConfigResponseHeader; +import org.apache.rocketmq.common.protocol.header.namesrv.GetKVListByNamespaceRequestHeader; +import org.apache.rocketmq.common.protocol.header.namesrv.GetRouteInfoRequestHeader; +import org.apache.rocketmq.common.protocol.header.namesrv.PutKVConfigRequestHeader; +import org.apache.rocketmq.common.protocol.header.namesrv.WipeWritePermOfBrokerRequestHeader; +import org.apache.rocketmq.common.protocol.header.namesrv.WipeWritePermOfBrokerResponseHeader; import org.apache.rocketmq.common.protocol.heartbeat.HeartbeatData; import org.apache.rocketmq.common.protocol.route.TopicRouteData; import org.apache.rocketmq.common.subscription.SubscriptionGroupConfig; import org.apache.rocketmq.remoting.InvokeCallback; import org.apache.rocketmq.remoting.RPCHook; import org.apache.rocketmq.remoting.RemotingClient; -import org.apache.rocketmq.remoting.exception.*; +import org.apache.rocketmq.remoting.exception.RemotingCommandException; +import org.apache.rocketmq.remoting.exception.RemotingConnectException; +import org.apache.rocketmq.remoting.exception.RemotingException; +import org.apache.rocketmq.remoting.exception.RemotingSendRequestException; +import org.apache.rocketmq.remoting.exception.RemotingTimeoutException; +import org.apache.rocketmq.remoting.exception.RemotingTooMuchRequestException; import org.apache.rocketmq.remoting.netty.NettyClientConfig; import org.apache.rocketmq.remoting.netty.NettyRemotingClient; import org.apache.rocketmq.remoting.netty.ResponseFuture; @@ -60,17 +147,11 @@ import org.apache.rocketmq.remoting.protocol.RemotingCommand; import org.apache.rocketmq.remoting.protocol.RemotingSerializable; import org.slf4j.Logger; -import java.io.UnsupportedEncodingException; -import java.nio.ByteBuffer; -import java.util.*; -import java.util.concurrent.atomic.AtomicInteger; - - public class MQClientAPIImpl { private final static Logger log = ClientLogger.getLog(); public static boolean sendSmartMsg = - Boolean.parseBoolean(System.getProperty("org.apache.rocketmq.client.sendSmartMsg", "true")); + Boolean.parseBoolean(System.getProperty("org.apache.rocketmq.client.sendSmartMsg", "true")); static { System.setProperty(RemotingCommand.REMOTING_VERSION_KEY, Integer.toString(MQVersion.CURRENT_VERSION)); @@ -83,7 +164,7 @@ public class MQClientAPIImpl { private ClientConfig clientConfig; public MQClientAPIImpl(final NettyClientConfig nettyClientConfig, final ClientRemotingProcessor clientRemotingProcessor, - RPCHook rpcHook, final ClientConfig clientConfig) { + RPCHook rpcHook, final ClientConfig clientConfig) { this.clientConfig = clientConfig; topAddressing = new TopAddressing(MixAll.WS_ADDR, clientConfig.getUnitName()); this.remotingClient = new NettyRemotingClient(nettyClientConfig, null); @@ -149,14 +230,14 @@ public class MQClientAPIImpl { } public void createSubscriptionGroup(final String addr, final SubscriptionGroupConfig config, final long timeoutMillis) - throws RemotingException, MQBrokerException, InterruptedException, MQClientException { + throws RemotingException, MQBrokerException, InterruptedException, MQClientException { RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.UPDATE_AND_CREATE_SUBSCRIPTIONGROUP, null); byte[] body = RemotingSerializable.encode(config); request.setBody(body); RemotingCommand response = this.remotingClient.invokeSync(MixAll.brokerVIPChannel(this.clientConfig.isVipChannelEnabled(), addr), - request, timeoutMillis); + request, timeoutMillis); assert response != null; switch (response.getCode()) { case ResponseCode.SUCCESS: { @@ -171,7 +252,7 @@ public class MQClientAPIImpl { } public void createTopic(final String addr, final String defaultTopic, final TopicConfig topicConfig, final long timeoutMillis) - throws RemotingException, MQBrokerException, InterruptedException, MQClientException { + throws RemotingException, MQBrokerException, InterruptedException, MQClientException { CreateTopicRequestHeader requestHeader = new CreateTopicRequestHeader(); requestHeader.setTopic(topicConfig.getTopicName()); requestHeader.setDefaultTopic(defaultTopic); @@ -185,7 +266,7 @@ public class MQClientAPIImpl { RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.UPDATE_AND_CREATE_TOPIC, requestHeader); RemotingCommand response = this.remotingClient.invokeSync(MixAll.brokerVIPChannel(this.clientConfig.isVipChannelEnabled(), addr), - request, timeoutMillis); + request, timeoutMillis); assert response != null; switch (response.getCode()) { case ResponseCode.SUCCESS: { @@ -199,31 +280,31 @@ public class MQClientAPIImpl { } public SendResult sendMessage(// - final String addr, // 1 - final String brokerName, // 2 - final Message msg, // 3 - final SendMessageRequestHeader requestHeader, // 4 - final long timeoutMillis, // 5 - final CommunicationMode communicationMode, // 6 - final SendMessageContext context, // 7 - final DefaultMQProducerImpl producer // 8 + final String addr, // 1 + final String brokerName, // 2 + final Message msg, // 3 + final SendMessageRequestHeader requestHeader, // 4 + final long timeoutMillis, // 5 + final CommunicationMode communicationMode, // 6 + final SendMessageContext context, // 7 + final DefaultMQProducerImpl producer // 8 ) throws RemotingException, MQBrokerException, InterruptedException { return sendMessage(addr, brokerName, msg, requestHeader, timeoutMillis, communicationMode, null, null, null, 0, context, producer); } public SendResult sendMessage(// - final String addr, // 1 - final String brokerName, // 2 - final Message msg, // 3 - final SendMessageRequestHeader requestHeader, // 4 - final long timeoutMillis, // 5 - final CommunicationMode communicationMode, // 6 - final SendCallback sendCallback, // 7 - final TopicPublishInfo topicPublishInfo, // 8 - final MQClientInstance instance, // 9 - final int retryTimesWhenSendFailed, // 10 - final SendMessageContext context, // 11 - final DefaultMQProducerImpl producer // 12 + final String addr, // 1 + final String brokerName, // 2 + final Message msg, // 3 + final SendMessageRequestHeader requestHeader, // 4 + final long timeoutMillis, // 5 + final CommunicationMode communicationMode, // 6 + final SendCallback sendCallback, // 7 + final TopicPublishInfo topicPublishInfo, // 8 + final MQClientInstance instance, // 9 + final int retryTimesWhenSendFailed, // 10 + final SendMessageContext context, // 11 + final DefaultMQProducerImpl producer // 12 ) throws RemotingException, MQBrokerException, InterruptedException { RemotingCommand request = null; if (sendSmartMsg) { @@ -242,7 +323,7 @@ public class MQClientAPIImpl { case ASYNC: final AtomicInteger times = new AtomicInteger(); this.sendMessageAsync(addr, brokerName, msg, timeoutMillis, request, sendCallback, topicPublishInfo, instance, - retryTimesWhenSendFailed, times, context, producer); + retryTimesWhenSendFailed, times, context, producer); return null; case SYNC: return this.sendMessageSync(addr, brokerName, msg, timeoutMillis, request); @@ -255,11 +336,11 @@ public class MQClientAPIImpl { } private SendResult sendMessageSync(// - final String addr, // - final String brokerName, // - final Message msg, // - final long timeoutMillis, // - final RemotingCommand request// + final String addr, // + final String brokerName, // + final Message msg, // + final long timeoutMillis, // + final RemotingCommand request// ) throws RemotingException, MQBrokerException, InterruptedException { RemotingCommand response = this.remotingClient.invokeSync(addr, request, timeoutMillis); assert response != null; @@ -267,18 +348,18 @@ public class MQClientAPIImpl { } private void sendMessageAsync(// - final String addr, // - final String brokerName, // - final Message msg, // - final long timeoutMillis, // - final RemotingCommand request, // - final SendCallback sendCallback, // - final TopicPublishInfo topicPublishInfo, // - final MQClientInstance instance, // - final int retryTimesWhenSendFailed, // - final AtomicInteger times, // - final SendMessageContext context, // - final DefaultMQProducerImpl producer // + final String addr, // + final String brokerName, // + final Message msg, // + final long timeoutMillis, // + final RemotingCommand request, // + final SendCallback sendCallback, // + final TopicPublishInfo topicPublishInfo, // + final MQClientInstance instance, // + final int retryTimesWhenSendFailed, // + final AtomicInteger times, // + final SendMessageContext context, // + final DefaultMQProducerImpl producer // ) throws InterruptedException, RemotingException { this.remotingClient.invokeAsync(addr, request, timeoutMillis, new InvokeCallback() { @Override @@ -318,68 +399,67 @@ public class MQClientAPIImpl { } catch (Exception e) { producer.updateFaultItem(brokerName, System.currentTimeMillis() - responseFuture.getBeginTimestamp(), true); onExceptionImpl(brokerName, msg, 0L, request, sendCallback, topicPublishInfo, instance, - retryTimesWhenSendFailed, times, e, context, false, producer); + retryTimesWhenSendFailed, times, e, context, false, producer); } } else { producer.updateFaultItem(brokerName, System.currentTimeMillis() - responseFuture.getBeginTimestamp(), true); if (!responseFuture.isSendRequestOK()) { MQClientException ex = new MQClientException("send request failed", responseFuture.getCause()); onExceptionImpl(brokerName, msg, 0L, request, sendCallback, topicPublishInfo, instance, - retryTimesWhenSendFailed, times, ex, context, true, producer); + retryTimesWhenSendFailed, times, ex, context, true, producer); } else if (responseFuture.isTimeout()) { MQClientException ex = new MQClientException("wait response timeout " + responseFuture.getTimeoutMillis() + "ms", - responseFuture.getCause()); + responseFuture.getCause()); onExceptionImpl(brokerName, msg, 0L, request, sendCallback, topicPublishInfo, instance, - retryTimesWhenSendFailed, times, ex, context, true, producer); + retryTimesWhenSendFailed, times, ex, context, true, producer); } else { MQClientException ex = new MQClientException("unknow reseaon", responseFuture.getCause()); onExceptionImpl(brokerName, msg, 0L, request, sendCallback, topicPublishInfo, instance, - retryTimesWhenSendFailed, times, ex, context, true, producer); + retryTimesWhenSendFailed, times, ex, context, true, producer); } } } }); } - private void onExceptionImpl(final String brokerName, // - final Message msg, // - final long timeoutMillis, // - final RemotingCommand request, // - final SendCallback sendCallback, // - final TopicPublishInfo topicPublishInfo, // - final MQClientInstance instance, // - final int timesTotal, // - final AtomicInteger curTimes, // - final Exception e, // - final SendMessageContext context, // - final boolean needRetry, // - final DefaultMQProducerImpl producer // 12 + final Message msg, // + final long timeoutMillis, // + final RemotingCommand request, // + final SendCallback sendCallback, // + final TopicPublishInfo topicPublishInfo, // + final MQClientInstance instance, // + final int timesTotal, // + final AtomicInteger curTimes, // + final Exception e, // + final SendMessageContext context, // + final boolean needRetry, // + final DefaultMQProducerImpl producer // 12 ) { int tmp = curTimes.incrementAndGet(); if (needRetry && tmp <= timesTotal) { MessageQueue tmpmq = producer.selectOneMessageQueue(topicPublishInfo, brokerName); String addr = instance.findBrokerAddressInPublish(tmpmq.getBrokerName()); log.info("async send msg by retry {} times. topic={}, brokerAddr={}, brokerName={}", tmp, msg.getTopic(), addr, - tmpmq.getBrokerName()); + tmpmq.getBrokerName()); try { request.setOpaque(RemotingCommand.createNewRequestId()); sendMessageAsync(addr, tmpmq.getBrokerName(), msg, timeoutMillis, request, sendCallback, topicPublishInfo, instance, - timesTotal, curTimes, context, producer); + timesTotal, curTimes, context, producer); } catch (InterruptedException e1) { onExceptionImpl(tmpmq.getBrokerName(), msg, timeoutMillis, request, sendCallback, topicPublishInfo, instance, timesTotal, curTimes, e1, - context, false, producer); + context, false, producer); } catch (RemotingConnectException e1) { producer.updateFaultItem(brokerName, 3000, true); onExceptionImpl(tmpmq.getBrokerName(), msg, timeoutMillis, request, sendCallback, topicPublishInfo, instance, timesTotal, curTimes, e1, - context, true, producer); + context, true, producer); } catch (RemotingTooMuchRequestException e1) { onExceptionImpl(tmpmq.getBrokerName(), msg, timeoutMillis, request, sendCallback, topicPublishInfo, instance, timesTotal, curTimes, e1, - context, false, producer); + context, false, producer); } catch (RemotingException e1) { producer.updateFaultItem(brokerName, 3000, true); onExceptionImpl(tmpmq.getBrokerName(), msg, timeoutMillis, request, sendCallback, topicPublishInfo, instance, timesTotal, curTimes, e1, - context, true, producer); + context, true, producer); } } else { if (context != null) { @@ -393,11 +473,10 @@ public class MQClientAPIImpl { } } - private SendResult processSendResponse(// - final String brokerName, // - final Message msg, // - final RemotingCommand response// + final String brokerName, // + final Message msg, // + final RemotingCommand response// ) throws MQBrokerException, RemotingCommandException { switch (response.getCode()) { case ResponseCode.FLUSH_DISK_TIMEOUT: @@ -426,13 +505,13 @@ public class MQClientAPIImpl { } SendMessageResponseHeader responseHeader = - (SendMessageResponseHeader) response.decodeCommandCustomHeader(SendMessageResponseHeader.class); + (SendMessageResponseHeader)response.decodeCommandCustomHeader(SendMessageResponseHeader.class); MessageQueue messageQueue = new MessageQueue(msg.getTopic(), brokerName, responseHeader.getQueueId()); SendResult sendResult = new SendResult(sendStatus, - MessageClientIDSetter.getUniqID(msg), - responseHeader.getMsgId(), messageQueue, responseHeader.getQueueOffset()); + MessageClientIDSetter.getUniqID(msg), + responseHeader.getMsgId(), messageQueue, responseHeader.getQueueOffset()); sendResult.setTransactionId(responseHeader.getTransactionId()); String regionId = response.getExtFields().get(MessageConst.PROPERTY_MSG_REGION); String traceOn = response.getExtFields().get(MessageConst.PROPERTY_TRACE_SWITCH); @@ -454,13 +533,12 @@ public class MQClientAPIImpl { throw new MQBrokerException(response.getCode(), response.getRemark()); } - public PullResult pullMessage(// - final String addr, // - final PullMessageRequestHeader requestHeader, // - final long timeoutMillis, // - final CommunicationMode communicationMode, // - final PullCallback pullCallback// + final String addr, // + final PullMessageRequestHeader requestHeader, // + final long timeoutMillis, // + final CommunicationMode communicationMode, // + final PullCallback pullCallback// ) throws RemotingException, MQBrokerException, InterruptedException { RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.PULL_MESSAGE, requestHeader); @@ -481,12 +559,11 @@ public class MQClientAPIImpl { return null; } - private void pullMessageAsync(// - final String addr, // 1 - final RemotingCommand request, // - final long timeoutMillis, // - final PullCallback pullCallback// + final String addr, // 1 + final RemotingCommand request, // + final long timeoutMillis, // + final PullCallback pullCallback// ) throws RemotingException, InterruptedException { this.remotingClient.invokeAsync(addr, request, timeoutMillis, new InvokeCallback() { @Override @@ -505,7 +582,7 @@ public class MQClientAPIImpl { pullCallback.onException(new MQClientException("send request failed", responseFuture.getCause())); } else if (responseFuture.isTimeout()) { pullCallback.onException(new MQClientException("wait response timeout " + responseFuture.getTimeoutMillis() + "ms", - responseFuture.getCause())); + responseFuture.getCause())); } else { pullCallback.onException(new MQClientException("unknow reseaon", responseFuture.getCause())); } @@ -515,9 +592,9 @@ public class MQClientAPIImpl { } private PullResult pullMessageSync(// - final String addr, // 1 - final RemotingCommand request, // 2 - final long timeoutMillis// 3 + final String addr, // 1 + final RemotingCommand request, // 2 + final long timeoutMillis// 3 ) throws RemotingException, InterruptedException, MQBrokerException { RemotingCommand response = this.remotingClient.invokeSync(addr, request, timeoutMillis); assert response != null; @@ -545,20 +622,20 @@ public class MQClientAPIImpl { } PullMessageResponseHeader responseHeader = - (PullMessageResponseHeader) response.decodeCommandCustomHeader(PullMessageResponseHeader.class); + (PullMessageResponseHeader)response.decodeCommandCustomHeader(PullMessageResponseHeader.class); return new PullResultExt(pullStatus, responseHeader.getNextBeginOffset(), responseHeader.getMinOffset(), - responseHeader.getMaxOffset(), null, responseHeader.getSuggestWhichBrokerId(), response.getBody()); + responseHeader.getMaxOffset(), null, responseHeader.getSuggestWhichBrokerId(), response.getBody()); } public MessageExt viewMessage(final String addr, final long phyoffset, final long timeoutMillis) - throws RemotingException, MQBrokerException, InterruptedException { + throws RemotingException, MQBrokerException, InterruptedException { ViewMessageRequestHeader requestHeader = new ViewMessageRequestHeader(); requestHeader.setOffset(phyoffset); RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.VIEW_MESSAGE_BY_ID, requestHeader); RemotingCommand response = this.remotingClient.invokeSync(MixAll.brokerVIPChannel(this.clientConfig.isVipChannelEnabled(), addr), - request, timeoutMillis); + request, timeoutMillis); assert response != null; switch (response.getCode()) { case ResponseCode.SUCCESS: { @@ -573,9 +650,8 @@ public class MQClientAPIImpl { throw new MQBrokerException(response.getCode(), response.getRemark()); } - public long searchOffset(final String addr, final String topic, final int queueId, final long timestamp, final long timeoutMillis) - throws RemotingException, MQBrokerException, InterruptedException { + throws RemotingException, MQBrokerException, InterruptedException { SearchOffsetRequestHeader requestHeader = new SearchOffsetRequestHeader(); requestHeader.setTopic(topic); requestHeader.setQueueId(queueId); @@ -583,12 +659,12 @@ public class MQClientAPIImpl { RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.SEARCH_OFFSET_BY_TIMESTAMP, requestHeader); RemotingCommand response = this.remotingClient.invokeSync(MixAll.brokerVIPChannel(this.clientConfig.isVipChannelEnabled(), addr), - request, timeoutMillis); + request, timeoutMillis); assert response != null; switch (response.getCode()) { case ResponseCode.SUCCESS: { SearchOffsetResponseHeader responseHeader = - (SearchOffsetResponseHeader) response.decodeCommandCustomHeader(SearchOffsetResponseHeader.class); + (SearchOffsetResponseHeader)response.decodeCommandCustomHeader(SearchOffsetResponseHeader.class); return responseHeader.getOffset(); } default: @@ -598,21 +674,20 @@ public class MQClientAPIImpl { throw new MQBrokerException(response.getCode(), response.getRemark()); } - public long getMaxOffset(final String addr, final String topic, final int queueId, final long timeoutMillis) - throws RemotingException, MQBrokerException, InterruptedException { + throws RemotingException, MQBrokerException, InterruptedException { GetMaxOffsetRequestHeader requestHeader = new GetMaxOffsetRequestHeader(); requestHeader.setTopic(topic); requestHeader.setQueueId(queueId); RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.GET_MAX_OFFSET, requestHeader); RemotingCommand response = this.remotingClient.invokeSync(MixAll.brokerVIPChannel(this.clientConfig.isVipChannelEnabled(), addr), - request, timeoutMillis); + request, timeoutMillis); assert response != null; switch (response.getCode()) { case ResponseCode.SUCCESS: { GetMaxOffsetResponseHeader responseHeader = - (GetMaxOffsetResponseHeader) response.decodeCommandCustomHeader(GetMaxOffsetResponseHeader.class); + (GetMaxOffsetResponseHeader)response.decodeCommandCustomHeader(GetMaxOffsetResponseHeader.class); return responseHeader.getOffset(); } @@ -623,24 +698,23 @@ public class MQClientAPIImpl { throw new MQBrokerException(response.getCode(), response.getRemark()); } - public List<String> getConsumerIdListByGroup(// - final String addr, // - final String consumerGroup, // - final long timeoutMillis) throws RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException, - MQBrokerException, InterruptedException { + final String addr, // + final String consumerGroup, // + final long timeoutMillis) throws RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException, + MQBrokerException, InterruptedException { GetConsumerListByGroupRequestHeader requestHeader = new GetConsumerListByGroupRequestHeader(); requestHeader.setConsumerGroup(consumerGroup); RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.GET_CONSUMER_LIST_BY_GROUP, requestHeader); RemotingCommand response = this.remotingClient.invokeSync(MixAll.brokerVIPChannel(this.clientConfig.isVipChannelEnabled(), addr), - request, timeoutMillis); + request, timeoutMillis); assert response != null; switch (response.getCode()) { case ResponseCode.SUCCESS: { if (response.getBody() != null) { GetConsumerListByGroupResponseBody body = - GetConsumerListByGroupResponseBody.decode(response.getBody(), GetConsumerListByGroupResponseBody.class); + GetConsumerListByGroupResponseBody.decode(response.getBody(), GetConsumerListByGroupResponseBody.class); return body.getConsumerIdList(); } } @@ -651,21 +725,20 @@ public class MQClientAPIImpl { throw new MQBrokerException(response.getCode(), response.getRemark()); } - public long getMinOffset(final String addr, final String topic, final int queueId, final long timeoutMillis) - throws RemotingException, MQBrokerException, InterruptedException { + throws RemotingException, MQBrokerException, InterruptedException { GetMinOffsetRequestHeader requestHeader = new GetMinOffsetRequestHeader(); requestHeader.setTopic(topic); requestHeader.setQueueId(queueId); RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.GET_MIN_OFFSET, requestHeader); RemotingCommand response = this.remotingClient.invokeSync(MixAll.brokerVIPChannel(this.clientConfig.isVipChannelEnabled(), addr), - request, timeoutMillis); + request, timeoutMillis); assert response != null; switch (response.getCode()) { case ResponseCode.SUCCESS: { GetMinOffsetResponseHeader responseHeader = - (GetMinOffsetResponseHeader) response.decodeCommandCustomHeader(GetMinOffsetResponseHeader.class); + (GetMinOffsetResponseHeader)response.decodeCommandCustomHeader(GetMinOffsetResponseHeader.class); return responseHeader.getOffset(); } @@ -676,21 +749,20 @@ public class MQClientAPIImpl { throw new MQBrokerException(response.getCode(), response.getRemark()); } - public long getEarliestMsgStoretime(final String addr, final String topic, final int queueId, final long timeoutMillis) - throws RemotingException, MQBrokerException, InterruptedException { + throws RemotingException, MQBrokerException, InterruptedException { GetEarliestMsgStoretimeRequestHeader requestHeader = new GetEarliestMsgStoretimeRequestHeader(); requestHeader.setTopic(topic); requestHeader.setQueueId(queueId); RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.GET_EARLIEST_MSG_STORETIME, requestHeader); RemotingCommand response = this.remotingClient.invokeSync(MixAll.brokerVIPChannel(this.clientConfig.isVipChannelEnabled(), addr), - request, timeoutMillis); + request, timeoutMillis); assert response != null; switch (response.getCode()) { case ResponseCode.SUCCESS: { GetEarliestMsgStoretimeResponseHeader responseHeader = - (GetEarliestMsgStoretimeResponseHeader) response.decodeCommandCustomHeader(GetEarliestMsgStoretimeResponseHeader.class); + (GetEarliestMsgStoretimeResponseHeader)response.decodeCommandCustomHeader(GetEarliestMsgStoretimeResponseHeader.class); return responseHeader.getTimestamp(); } @@ -701,21 +773,20 @@ public class MQClientAPIImpl { throw new MQBrokerException(response.getCode(), response.getRemark()); } - public long queryConsumerOffset(// - final String addr, // - final QueryConsumerOffsetRequestHeader requestHeader, // - final long timeoutMillis// + final String addr, // + final QueryConsumerOffsetRequestHeader requestHeader, // + final long timeoutMillis// ) throws RemotingException, MQBrokerException, InterruptedException { RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.QUERY_CONSUMER_OFFSET, requestHeader); RemotingCommand response = this.remotingClient.invokeSync(MixAll.brokerVIPChannel(this.clientConfig.isVipChannelEnabled(), addr), - request, timeoutMillis); + request, timeoutMillis); assert response != null; switch (response.getCode()) { case ResponseCode.SUCCESS: { QueryConsumerOffsetResponseHeader responseHeader = - (QueryConsumerOffsetResponseHeader) response.decodeCommandCustomHeader(QueryConsumerOffsetResponseHeader.class); + (QueryConsumerOffsetResponseHeader)response.decodeCommandCustomHeader(QueryConsumerOffsetResponseHeader.class); return responseHeader.getOffset(); } @@ -726,16 +797,15 @@ public class MQClientAPIImpl { throw new MQBrokerException(response.getCode(), response.getRemark()); } - public void updateConsumerOffset(// - final String addr, // - final UpdateConsumerOffsetRequestHeader requestHeader, // - final long timeoutMillis// + final String addr, // + final UpdateConsumerOffsetRequestHeader requestHeader, // + final long timeoutMillis// ) throws RemotingException, MQBrokerException, InterruptedException { RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.UPDATE_CONSUMER_OFFSET, requestHeader); RemotingCommand response = this.remotingClient.invokeSync(MixAll.brokerVIPChannel(this.clientConfig.isVipChannelEnabled(), addr), - request, timeoutMillis); + request, timeoutMillis); assert response != null; switch (response.getCode()) { case ResponseCode.SUCCESS: { @@ -748,23 +818,21 @@ public class MQClientAPIImpl { throw new MQBrokerException(response.getCode(), response.getRemark()); } - public void updateConsumerOffsetOneway(// - final String addr, // - final UpdateConsumerOffsetRequestHeader requestHeader, // - final long timeoutMillis// + final String addr, // + final UpdateConsumerOffsetRequestHeader requestHeader, // + final long timeoutMillis// ) throws RemotingConnectException, RemotingTooMuchRequestException, RemotingTimeoutException, RemotingSendRequestException, - InterruptedException { + InterruptedException { RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.UPDATE_CONSUMER_OFFSET, requestHeader); this.remotingClient.invokeOneway(MixAll.brokerVIPChannel(this.clientConfig.isVipChannelEnabled(), addr), request, timeoutMillis); } - public void sendHearbeat(// - final String addr, // - final HeartbeatData heartbeatData, // - final long timeoutMillis// + final String addr, // + final HeartbeatData heartbeatData, // + final long timeoutMillis// ) throws RemotingException, MQBrokerException, InterruptedException { RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.HEART_BEAT, null); @@ -782,13 +850,12 @@ public class MQClientAPIImpl { throw new MQBrokerException(response.getCode(), response.getRemark()); } - public void unregisterClient(// - final String addr, // - final String clientID, // - final String producerGroup, // - final String consumerGroup, // - final long timeoutMillis// + final String addr, // + final String clientID, // + final String producerGroup, // + final String consumerGroup, // + final long timeoutMillis// ) throws RemotingException, MQBrokerException, InterruptedException { final UnregisterClientRequestHeader requestHeader = new UnregisterClientRequestHeader(); requestHeader.setClientID(clientID); @@ -809,12 +876,11 @@ public class MQClientAPIImpl { throw new MQBrokerException(response.getCode(), response.getRemark()); } - public void endTransactionOneway(// - final String addr, // - final EndTransactionRequestHeader requestHeader, // - final String remark, // - final long timeoutMillis// + final String addr, // + final EndTransactionRequestHeader requestHeader, // + final String remark, // + final long timeoutMillis// ) throws RemotingException, MQBrokerException, InterruptedException { RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.END_TRANSACTION, requestHeader); @@ -822,23 +888,21 @@ public class MQClientAPIImpl { this.remotingClient.invokeOneway(addr, request, timeoutMillis); } - public void queryMessage( - final String addr, - final QueryMessageRequestHeader requestHeader, - final long timeoutMillis, - final InvokeCallback invokeCallback, - final Boolean isUnqiueKey + final String addr, + final QueryMessageRequestHeader requestHeader, + final long timeoutMillis, + final InvokeCallback invokeCallback, + final Boolean isUnqiueKey ) throws RemotingException, MQBrokerException, InterruptedException { RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.QUERY_MESSAGE, requestHeader); request.addExtField(MixAll.UNIQUE_MSG_QUERY_FLAG, isUnqiueKey.toString()); this.remotingClient.invokeAsync(MixAll.brokerVIPChannel(this.clientConfig.isVipChannelEnabled(), addr), request, timeoutMillis, - invokeCallback); + invokeCallback); } - public boolean registerClient(final String addr, final HeartbeatData heartbeat, final long timeoutMillis) - throws RemotingException, InterruptedException { + throws RemotingException, InterruptedException { RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.HEART_BEAT, null); request.setBody(heartbeat.encode()); @@ -846,14 +910,13 @@ public class MQClientAPIImpl { return response.getCode() == ResponseCode.SUCCESS; } - public void consumerSendMessageBack( - final String addr, - final MessageExt msg, - final String consumerGroup, - final int delayLevel, - final long timeoutMillis, - final int maxConsumeRetryTimes + final String addr, + final MessageExt msg, + final String consumerGroup, + final int delayLevel, + final long timeoutMillis, + final int maxConsumeRetryTimes ) throws RemotingException, MQBrokerException, InterruptedException { ConsumerSendMsgBackRequestHeader requestHeader = new ConsumerSendMsgBackRequestHeader(); RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.CONSUMER_SEND_MSG_BACK, requestHeader); @@ -866,7 +929,7 @@ public class MQClientAPIImpl { requestHeader.setMaxReconsumeTimes(maxConsumeRetryTimes); RemotingCommand response = this.remotingClient.invokeSync(MixAll.brokerVIPChannel(this.clientConfig.isVipChannelEnabled(), addr), - request, timeoutMillis); + request, timeoutMillis); assert response != null; switch (response.getCode()) { case ResponseCode.SUCCESS: { @@ -879,16 +942,15 @@ public class MQClientAPIImpl { throw new MQBrokerException(response.getCode(), response.getRemark()); } - public Set<MessageQueue> lockBatchMQ(// - final String addr, // - final LockBatchRequestBody requestBody, // - final long timeoutMillis) throws RemotingException, MQBrokerException, InterruptedException { + final String addr, // + final LockBatchRequestBody requestBody, // + final long timeoutMillis) throws RemotingException, MQBrokerException, InterruptedException { RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.LOCK_BATCH_MQ, null); request.setBody(requestBody.encode()); RemotingCommand response = this.remotingClient.invokeSync(MixAll.brokerVIPChannel(this.clientConfig.isVipChannelEnabled(), addr), - request, timeoutMillis); + request, timeoutMillis); switch (response.getCode()) { case ResponseCode.SUCCESS: { LockBatchResponseBody responseBody = LockBatchResponseBody.decode(response.getBody(), LockBatchResponseBody.class); @@ -902,12 +964,11 @@ public class MQClientAPIImpl { throw new MQBrokerException(response.getCode(), response.getRemark()); } - public void unlockBatchMQ(// - final String addr, // - final UnlockBatchRequestBody requestBody, // - final long timeoutMillis, // - final boolean oneway// + final String addr, // + final UnlockBatchRequestBody requestBody, // + final long timeoutMillis, // + final boolean oneway// ) throws RemotingException, MQBrokerException, InterruptedException { RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.UNLOCK_BATCH_MQ, null); @@ -917,7 +978,7 @@ public class MQClientAPIImpl { this.remotingClient.invokeOneway(addr, request, timeoutMillis); } else { RemotingCommand response = this.remotingClient - .invokeSync(MixAll.brokerVIPChannel(this.clientConfig.isVipChannelEnabled(), addr), request, timeoutMillis); + .invokeSync(MixAll.brokerVIPChannel(this.clientConfig.isVipChannelEnabled(), addr), request, timeoutMillis); switch (response.getCode()) { case ResponseCode.SUCCESS: { return; @@ -930,16 +991,15 @@ public class MQClientAPIImpl { } } - public TopicStatsTable getTopicStatsInfo(final String addr, final String topic, final long timeoutMillis) throws InterruptedException, - RemotingTimeoutException, RemotingSendRequestException, RemotingConnectException, MQBrokerException { + RemotingTimeoutException, RemotingSendRequestException, RemotingConnectException, MQBrokerException { GetTopicStatsInfoRequestHeader requestHeader = new GetTopicStatsInfoRequestHeader(); requestHeader.setTopic(topic); RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.GET_TOPIC_STATS_INFO, requestHeader); RemotingCommand response = this.remotingClient.invokeSync(MixAll.brokerVIPChannel(this.clientConfig.isVipChannelEnabled(), addr), - request, timeoutMillis); + request, timeoutMillis); switch (response.getCode()) { case ResponseCode.SUCCESS: { TopicStatsTable topicStatsTable = TopicStatsTable.decode(response.getBody(), TopicStatsTable.class); @@ -952,17 +1012,15 @@ public class MQClientAPIImpl { throw new MQBrokerException(response.getCode(), response.getRemark()); } - public ConsumeStats getConsumeStats(final String addr, final String consumerGroup, final long timeoutMillis) - throws InterruptedException, RemotingTimeoutException, RemotingSendRequestException, RemotingConnectException, - MQBrokerException { + throws InterruptedException, RemotingTimeoutException, RemotingSendRequestException, RemotingConnectException, + MQBrokerException { return getConsumeStats(addr, consumerGroup, null, timeoutMillis); } - public ConsumeStats getConsumeStats(final String addr, final String consumerGroup, final String topic, final long timeoutMillis) - throws InterruptedException, RemotingTimeoutException, RemotingSendRequestException, RemotingConnectException, - MQBrokerException { + throws InterruptedException, RemotingTimeoutException, RemotingSendRequestException, RemotingConnectException, + MQBrokerException { GetConsumeStatsRequestHeader requestHeader = new GetConsumeStatsRequestHeader(); requestHeader.setConsumerGroup(consumerGroup); requestHeader.setTopic(topic); @@ -970,7 +1028,7 @@ public class MQClientAPIImpl { RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.GET_CONSUME_STATS, requestHeader); RemotingCommand response = this.remotingClient.invokeSync(MixAll.brokerVIPChannel(this.clientConfig.isVipChannelEnabled(), addr), - request, timeoutMillis); + request, timeoutMillis); switch (response.getCode()) { case ResponseCode.SUCCESS: { ConsumeStats consumeStats = ConsumeStats.decode(response.getBody(), ConsumeStats.class); @@ -983,17 +1041,16 @@ public class MQClientAPIImpl { throw new MQBrokerException(response.getCode(), response.getRemark()); } - public ProducerConnection getProducerConnectionList(final String addr, final String producerGroup, final long timeoutMillis) - throws RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException, InterruptedException, - MQBrokerException { + throws RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException, InterruptedException, + MQBrokerException { GetProducerConnectionListRequestHeader requestHeader = new GetProducerConnectionListRequestHeader(); requestHeader.setProducerGroup(producerGroup); RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.GET_PRODUCER_CONNECTION_LIST, requestHeader); RemotingCommand response = this.remotingClient.invokeSync(MixAll.brokerVIPChannel(this.clientConfig.isVipChannelEnabled(), addr), - request, timeoutMillis); + request, timeoutMillis); switch (response.getCode()) { case ResponseCode.SUCCESS: { return ProducerConnection.decode(response.getBody(), ProducerConnection.class); @@ -1005,17 +1062,16 @@ public class MQClientAPIImpl { throw new MQBrokerException(response.getCode(), response.getRemark()); } - public ConsumerConnection getConsumerConnectionList(final String addr, final String consumerGroup, final long timeoutMillis) - throws RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException, InterruptedException, - MQBrokerException { + throws RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException, InterruptedException, + MQBrokerException { GetConsumerConnectionListRequestHeader requestHeader = new GetConsumerConnectionListRequestHeader(); requestHeader.setConsumerGroup(consumerGroup); RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.GET_CONSUMER_CONNECTION_LIST, requestHeader); RemotingCommand response = this.remotingClient.invokeSync(MixAll.brokerVIPChannel(this.clientConfig.isVipChannelEnabled(), addr), - request, timeoutMillis); + request, timeoutMillis); switch (response.getCode()) { case ResponseCode.SUCCESS: { ConsumerConnection consumerConnection = ConsumerConnection.decode(response.getBody(), ConsumerConnection.class); @@ -1028,14 +1084,13 @@ public class MQClientAPIImpl { throw new MQBrokerException(response.getCode(), response.getRemark()); } - public KVTable getBrokerRuntimeInfo(final String addr, final long timeoutMillis) throws RemotingConnectException, - RemotingSendRequestException, RemotingTimeoutException, InterruptedException, MQBrokerException { + RemotingSendRequestException, RemotingTimeoutException, InterruptedException, MQBrokerException { RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.GET_BROKER_RUNTIME_INFO, null); RemotingCommand response = this.remotingClient.invokeSync(MixAll.brokerVIPChannel(this.clientConfig.isVipChannelEnabled(), addr), - request, timeoutMillis); + request, timeoutMillis); switch (response.getCode()) { case ResponseCode.SUCCESS: { return KVTable.decode(response.getBody(), KVTable.class); @@ -1047,10 +1102,9 @@ public class MQClientAPIImpl { throw new MQBrokerException(response.getCode(), response.getRemark()); } - public void updateBrokerConfig(final String addr, final Properties properties, final long timeoutMillis) - throws RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException, InterruptedException, - MQBrokerException, UnsupportedEncodingException { + throws RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException, InterruptedException, + MQBrokerException, UnsupportedEncodingException { RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.UPDATE_BROKER_CONFIG, null); @@ -1058,7 +1112,7 @@ public class MQClientAPIImpl { if (str != null && str.length() > 0) { request.setBody(str.getBytes(MixAll.DEFAULT_CHARSET)); RemotingCommand response = this.remotingClient - .invokeSync(MixAll.brokerVIPChannel(this.clientConfig.isVipChannelEnabled(), addr), request, timeoutMillis); + .invokeSync(MixAll.brokerVIPChannel(this.clientConfig.isVipChannelEnabled(), addr), request, timeoutMillis); switch (response.getCode()) { case ResponseCode.SUCCESS: { return; @@ -1071,10 +1125,9 @@ public class MQClientAPIImpl { } } - public Properties getBrokerConfig(final String addr, final long timeoutMillis) - throws RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException, InterruptedException, - MQBrokerException, UnsupportedEncodingException { + throws RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException, InterruptedException, + MQBrokerException, UnsupportedEncodingException { RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.GET_BROKER_CONFIG, null); RemotingCommand response = this.remotingClient.invokeSync(addr, request, timeoutMillis); @@ -1091,7 +1144,7 @@ public class MQClientAPIImpl { } public ClusterInfo getBrokerClusterInfo(final long timeoutMillis) throws InterruptedException, RemotingTimeoutException, - RemotingSendRequestException, RemotingConnectException, MQBrokerException { + RemotingSendRequestException, RemotingConnectException, MQBrokerException { RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.GET_BROKER_CLUSTER_INFO, null); RemotingCommand response = this.remotingClient.invokeSync(null, request, timeoutMillis); @@ -1108,9 +1161,8 @@ public class MQClientAPIImpl { throw new MQBrokerException(response.getCode(), response.getRemark()); } - public TopicRouteData getDefaultTopicRouteInfoFromNameServer(final String topic, final long timeoutMillis) - throws RemotingException, MQClientException, InterruptedException { + throws RemotingException, MQClientException, InterruptedException { GetRouteInfoRequestHeader requestHeader = new GetRouteInfoRequestHeader(); requestHeader.setTopic(topic); @@ -1136,9 +1188,8 @@ public class MQClientAPIImpl { throw new MQClientException(response.getCode(), response.getRemark()); } - public TopicRouteData getTopicRouteInfoFromNameServer(final String topic, final long timeoutMillis) - throws RemotingException, MQClientException, InterruptedException { + throws RemotingException, MQClientException, InterruptedException { GetRouteInfoRequestHeader requestHeader = new GetRouteInfoRequestHeader(); requestHeader.setTopic(topic); @@ -1165,9 +1216,8 @@ public class MQClientAPIImpl { throw new MQClientException(response.getCode(), response.getRemark()); } - public TopicList getTopicListFromNameServer(final long timeoutMillis) - throws RemotingException, MQClientException, InterruptedException { + throws RemotingException, MQClientException, InterruptedException { RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.GET_ALL_TOPIC_LIST_FROM_NAMESERVER, null); RemotingCommand response = this.remotingClient.invokeSync(null, request, timeoutMillis); @@ -1187,9 +1237,8 @@ public class MQClientAPIImpl { throw new MQClientException(response.getCode(), response.getRemark()); } - public int wipeWritePermOfBroker(final String namesrvAddr, String brokerName, final long timeoutMillis) throws RemotingCommandException, - RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException, InterruptedException, MQClientException { + RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException, InterruptedException, MQClientException { WipeWritePermOfBrokerRequestHeader requestHeader = new WipeWritePermOfBrokerRequestHeader(); requestHeader.setBrokerName(brokerName); @@ -1200,7 +1249,7 @@ public class MQClientAPIImpl { switch (response.getCode()) { case ResponseCode.SUCCESS: { WipeWritePermOfBrokerResponseHeader responseHeader = - (WipeWritePermOfBrokerResponseHeader) response.decodeCommandCustomHeader(WipeWritePermOfBrokerResponseHeader.class); + (WipeWritePermOfBrokerResponseHeader)response.decodeCommandCustomHeader(WipeWritePermOfBrokerResponseHeader.class); return responseHeader.getWipeTopicCount(); } default: @@ -1210,15 +1259,14 @@ public class MQClientAPIImpl { throw new MQClientException(response.getCode(), response.getRemark()); } - public void deleteTopicInBroker(final String addr, final String topic, final long timeoutMillis) - throws RemotingException, MQBrokerException, InterruptedException, MQClientException { + throws RemotingException, MQBrokerException, InterruptedException, MQClientException { DeleteTopicRequestHeader requestHeader = new DeleteTopicRequestHeader(); requestHeader.setTopic(topic); RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.DELETE_TOPIC_IN_BROKER, requestHeader); RemotingCommand response = this.remotingClient.invokeSync(MixAll.brokerVIPChannel(this.clientConfig.isVipChannelEnabled(), addr), - request, timeoutMillis); + request, timeoutMillis); assert response != null; switch (response.getCode()) { case ResponseCode.SUCCESS: { @@ -1231,9 +1279,8 @@ public class MQClientAPIImpl { throw new MQClientException(response.getCode(), response.getRemark()); } - public void deleteTopicInNameServer(final String addr, final String topic, final long timeoutMillis) - throws RemotingException, MQBrokerException, InterruptedException, MQClientException { + throws RemotingException, MQBrokerException, InterruptedException, MQClientException { DeleteTopicRequestHeader requestHeader = new DeleteTopicRequestHeader(); requestHeader.setTopic(topic); RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.DELETE_TOPIC_IN_NAMESRV, requestHeader); @@ -1251,15 +1298,14 @@ public class MQClientAPIImpl { throw new MQClientException(response.getCode(), response.getRemark()); } - public void deleteSubscriptionGroup(final String addr, final String groupName, final long timeoutMillis) - throws RemotingException, MQBrokerException, InterruptedException, MQClientException { + throws RemotingException, MQBrokerException, InterruptedException, MQClientException { DeleteSubscriptionGroupRequestHeader requestHeader = new DeleteSubscriptionGroupRequestHeader(); requestHeader.setGroupName(groupName); RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.DELETE_SUBSCRIPTIONGROUP, requestHeader); RemotingCommand response = this.remotingClient.invokeSync(MixAll.brokerVIPChannel(this.clientConfig.isVipChannelEnabled(), addr), - request, timeoutMillis); + request, timeoutMillis); assert response != null; switch (response.getCode()) { case ResponseCode.SUCCESS: { @@ -1272,9 +1318,8 @@ public class MQClientAPIImpl { throw new MQClientException(response.getCode(), response.getRemark()); } - public String getKVConfigValue(final String namespace, final String key, final long timeoutMillis) - throws RemotingException, MQClientException, InterruptedException { + throws RemotingException, MQClientException, InterruptedException { GetKVConfigRequestHeader requestHeader = new GetKVConfigRequestHeader(); requestHeader.setNamespace(namespace); requestHeader.setKey(key); @@ -1286,7 +1331,7 @@ public class MQClientAPIImpl { switch (response.getCode()) { case ResponseCode.SUCCESS: { GetKVConfigResponseHeader responseHeader = - (GetKVConfigResponseHeader) response.decodeCommandCustomHeader(GetKVConfigResponseHeader.class); + (GetKVConfigResponseHeader)response.decodeCommandCustomHeader(GetKVConfigResponseHeader.class); return responseHeader.getValue(); } default: @@ -1296,9 +1341,8 @@ public class MQClientAPIImpl { throw new MQClientException(response.getCode(), response.getRemark()); } - public void putKVConfigValue(final String namespace, final String key, final String value, final long timeoutMillis) - throws RemotingException, MQClientException, InterruptedException { + throws RemotingException, MQClientException, InterruptedException { PutKVConfigRequestHeader requestHeader = new PutKVConfigRequestHeader(); requestHeader.setNamespace(namespace); requestHeader.setKey(key); @@ -1327,9 +1371,8 @@ public class MQClientAPIImpl { } } - public void deleteKVConfigValue(final String namespace, final String key, final long timeoutMillis) - throws RemotingException, MQClientException, InterruptedException { + throws RemotingException, MQClientException, InterruptedException { DeleteKVConfigRequestHeader requestHeader = new DeleteKVConfigRequestHeader(); requestHeader.setNamespace(namespace); requestHeader.setKey(key); @@ -1356,9 +1399,8 @@ public class MQClientAPIImpl { } } - public KVTable getKVListByNamespace(final String namespace, final long timeoutMillis) - throws RemotingException, MQClientException, InterruptedException { + throws RemotingException, MQClientException, InterruptedException { GetKVListByNamespaceRequestHeader requestHeader = new GetKVListByNamespaceRequestHeader(); requestHeader.setNamespace(namespace); @@ -1377,17 +1419,15 @@ public class MQClientAPIImpl { throw new MQClientException(response.getCode(), response.getRemark()); } - public Map<MessageQueue, Long> invokeBrokerToResetOffset(final String addr, final String topic, final String group, - final long timestamp, final boolean isForce, final long timeoutMillis) - throws RemotingException, MQClientException, InterruptedException { + final long timestamp, final boolean isForce, final long timeoutMillis) + throws RemotingException, MQClientException, InterruptedException { return invokeBrokerToResetOffset(addr, topic, group, timestamp, isForce, timeoutMillis, false); } - public Map<MessageQueue, Long> invokeBrokerToResetOffset(final String addr, final String topic, final String group, - final long timestamp, final boolean isForce, final long timeoutMillis, boolean isC) - throws RemotingException, MQClientException, InterruptedException { + final long timestamp, final boolean isForce, final long timeoutMillis, boolean isC) + throws RemotingException, MQClientException, InterruptedException { ResetOffsetRequestHeader requestHeader = new ResetOffsetRequestHeader(); requestHeader.setTopic(topic); requestHeader.setGroup(group); @@ -1400,7 +1440,7 @@ public class MQClientAPIImpl { } RemotingCommand response = this.remotingClient.invokeSync(MixAll.brokerVIPChannel(this.clientConfig.isVipChannelEnabled(), addr), - request, timeoutMillis); + request, timeoutMillis); assert response != null; switch (response.getCode()) { case ResponseCode.SUCCESS: { @@ -1416,9 +1456,8 @@ public class MQClientAPIImpl { throw new MQClientException(response.getCode(), response.getRemark()); } - public Map<String, Map<MessageQueue, Long>> invokeBrokerToGetConsumerStatus(final String addr, final String topic, final String group, - final String clientAddr, final long timeoutMillis) throws RemotingException, MQClientException, InterruptedException { + final String clientAddr, final long timeoutMillis) throws RemotingException, MQClientException, InterruptedException { GetConsumerStatusRequestHeader requestHeader = new GetConsumerStatusRequestHeader(); requestHeader.setTopic(topic); requestHeader.setGroup(group); @@ -1427,7 +1466,7 @@ public class MQClientAPIImpl { RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.INVOKE_BROKER_TO_GET_CONSUMER_STATUS, requestHeader); RemotingCommand response = this.remotingClient.invokeSync(MixAll.brokerVIPChannel(this.clientConfig.isVipChannelEnabled(), addr), - request, timeoutMillis); + request, timeoutMillis); assert response != null; switch (response.getCode()) { case ResponseCode.SUCCESS: { @@ -1443,17 +1482,16 @@ public class MQClientAPIImpl { throw new MQClientException(response.getCode(), response.getRemark()); } - public GroupList queryTopicConsumeByWho(final String addr, final String topic, final long timeoutMillis) - throws RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException, InterruptedException, - MQBrokerException { + throws RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException, InterruptedException, + MQBrokerException { QueryTopicConsumeByWhoRequestHeader requestHeader = new QueryTopicConsumeByWhoRequestHeader(); requestHeader.setTopic(topic); RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.QUERY_TOPIC_CONSUME_BY_WHO, requestHeader); RemotingCommand response = this.remotingClient.invokeSync(MixAll.brokerVIPChannel(this.clientConfig.isVipChannelEnabled(), addr), - request, timeoutMillis); + request, timeoutMillis); switch (response.getCode()) { case ResponseCode.SUCCESS: { GroupList groupList = GroupList.decode(response.getBody(), GroupList.class); @@ -1466,10 +1504,9 @@ public class MQClientAPIImpl { throw new MQBrokerException(response.getCode(), response.getRemark()); } - public List<QueueTimeSpan> queryConsumeTimeSpan(final String addr, final String topic, final String group, final long timeoutMillis) - throws RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException, InterruptedException, - MQBrokerException { + throws RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException, InterruptedException, + MQBrokerException { QueryConsumeTimeSpanRequestHeader requestHeader = new QueryConsumeTimeSpanRequestHeader(); requestHeader.setTopic(topic); requestHeader.setGroup(group); @@ -1477,7 +1514,7 @@ public class MQClientAPIImpl { RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.QUERY_CONSUME_TIME_SPAN, requestHeader); RemotingCommand response = this.remotingClient.invokeSync(MixAll.brokerVIPChannel(this.clientConfig.isVipChannelEnabled(), addr), - request, timeoutMillis); + request, timeoutMillis); switch (response.getCode()) { case ResponseCode.SUCCESS: { QueryConsumeTimeSpanBody consumeTimeSpanBody = GroupList.decode(response.getBody(), QueryConsumeTimeSpanBody.class); @@ -1490,9 +1527,8 @@ public class MQClientAPIImpl { throw new MQBrokerException(response.getCode(), response.getRemark()); } - public TopicList getTopicsByCluster(final String cluster, final long timeoutMillis) - throws RemotingException, MQClientException, InterruptedException { + throws RemotingException, MQClientException, InterruptedException { GetTopicsByClusterRequestHeader requestHeader = new GetTopicsByClusterRequestHeader(); requestHeader.setCluster(cluster); RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.GET_TOPICS_BY_CLUSTER, requestHeader); @@ -1514,15 +1550,14 @@ public class MQClientAPIImpl { throw new MQClientException(response.getCode(), response.getRemark()); } - public void registerMessageFilterClass(final String addr, // - final String consumerGroup, // - final String topic, // - final String className, // - final int classCRC, // - final byte[] classBody, // - final long timeoutMillis) throws RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException, - InterruptedException, MQBrokerException { + final String consumerGroup, // + final String topic, // + final String className, // + final int classCRC, // + final byte[] classBody, // + final long timeoutMillis) throws RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException, + InterruptedException, MQBrokerException { RegisterMessageFilterClassRequestHeader requestHeader = new RegisterMessageFilterClassRequestHeader(); requestHeader.setConsumerGroup(consumerGroup); requestHeader.setClassName(className); @@ -1543,7 +1578,6 @@ public class MQClientAPIImpl { throw new MQBrokerException(response.getCode(), response.getRemark()); } - public TopicList getSystemTopicList(final long timeoutMillis) throws RemotingException, MQClientException, InterruptedException { RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.GET_SYSTEM_TOPIC_LIST_FROM_NS, null); @@ -1555,7 +1589,7 @@ public class MQClientAPIImpl { if (body != null) { TopicList topicList = TopicList.decode(response.getBody(), TopicList.class); if (topicList.getTopicList() != null && !topicList.getTopicList().isEmpty() - && !UtilAll.isBlank(topicList.getBrokerAddr())) { + && !UtilAll.isBlank(topicList.getBrokerAddr())) { TopicList tmp = getSystemTopicListFromBroker(topicList.getBrokerAddr(), timeoutMillis); if (tmp.getTopicList() != null && !tmp.getTopicList().isEmpty()) { topicList.getTopicList().addAll(tmp.getTopicList()); @@ -1571,13 +1605,12 @@ public class MQClientAPIImpl { throw new MQClientException(response.getCode(), response.getRemark()); } - public TopicList getSystemTopicListFromBroker(final String addr, final long timeoutMillis) - throws RemotingException, MQClientException, InterruptedException { + throws RemotingException, MQClientException, InterruptedException { RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.GET_SYSTEM_TOPIC_LIST_FROM_BROKER, null); RemotingCommand response = this.remotingClient.invokeSync(MixAll.brokerVIPChannel(this.clientConfig.isVipChannelEnabled(), addr), - request, timeoutMillis); + request, timeoutMillis); assert response != null; switch (response.getCode()) { case ResponseCode.SUCCESS: { @@ -1594,12 +1627,11 @@ public class MQClientAPIImpl { throw new MQClientException(response.getCode(), response.getRemark()); } - public boolean cleanExpiredConsumeQueue(final String addr, long timeoutMillis) throws MQClientException, RemotingConnectException, - RemotingSendRequestException, RemotingTimeoutException, InterruptedException { + RemotingSendRequestException, RemotingTimeoutException, InterruptedException { RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.CLEAN_EXPIRED_CONSUMEQUEUE, null); RemotingCommand response = this.remotingClient.invokeSync(MixAll.brokerVIPChannel(this.clientConfig.isVipChannelEnabled(), addr), - request, timeoutMillis); + request, timeoutMillis); switch (response.getCode()) { case ResponseCode.SUCCESS: { return true; @@ -1611,12 +1643,11 @@ public class MQClientAPIImpl { throw new MQClientException(response.getCode(), response.getRemark()); } - public boolean cleanUnusedTopicByAddr(final String addr, long timeoutMillis) throws MQClientException, RemotingConnectException, - RemotingSendRequestException, RemotingTimeoutException, InterruptedException { + RemotingSendRequestException, RemotingTimeoutException, InterruptedException { RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.CLEAN_UNUSED_TOPIC, null); RemotingCommand response = this.remotingClient.invokeSync(MixAll.brokerVIPChannel(this.clientConfig.isVipChannelEnabled(), addr), - request, timeoutMillis); + request, timeoutMillis); switch (response.getCode()) { case ResponseCode.SUCCESS: { return true; @@ -1629,7 +1660,7 @@ public class MQClientAPIImpl { } public ConsumerRunningInfo getConsumerRunningInfo(final String addr, String consumerGroup, String clientId, boolean jstack, - final long timeoutMillis) throws RemotingException, MQClientException, InterruptedException { + final long timeoutMillis) throws RemotingException, MQClientException, InterruptedException { GetConsumerRunningInfoRequestHeader requestHeader = new GetConsumerRunningInfoRequestHeader(); requestHeader.setConsumerGroup(consumerGroup); requestHeader.setClientId(clientId); @@ -1638,7 +1669,7 @@ public class MQClientAPIImpl { RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.GET_CONSUMER_RUNNING_INFO, requestHeader); RemotingCommand response = this.remotingClient.invokeSync(MixAll.brokerVIPChannel(this.clientConfig.isVipChannelEnabled(), addr), - request, timeoutMillis); + request, timeoutMillis); assert response != null; switch (response.getCode()) { case ResponseCode.SUCCESS: { @@ -1656,10 +1687,10 @@ public class MQClientAPIImpl { } public ConsumeMessageDirectlyResult consumeMessageDirectly(final String addr, // - String consumerGroup, // - String clientId, // - String msgId, // - final long timeoutMillis) throws RemotingException, MQClientException, InterruptedException { + String consumerGroup, // + String clientId, // + String msgId, // + final long timeoutMillis) throws RemotingException, MQClientException, InterruptedException { ConsumeMessageDirectlyResultRequestHeader requestHeader = new ConsumeMessageDirectlyResultRequestHeader(); requestHeader.setConsumerGroup(consumerGroup); requestHeader.setClientId(clientId); @@ -1668,7 +1699,7 @@ public class MQClientAPIImpl { RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.CONSUME_MESSAGE_DIRECTLY, requestHeader); RemotingCommand response = this.remotingClient.invokeSync(MixAll.brokerVIPChannel(this.clientConfig.isVipChannelEnabled(), addr), - request, timeoutMillis); + request, timeoutMillis); assert response != null; switch (response.getCode()) { case ResponseCode.SUCCESS: { @@ -1686,8 +1717,8 @@ public class MQClientAPIImpl { } public Map<Integer, Long> queryCorrectionOffset(final String addr, final String topic, final String group, Set<String> filterGroup, - long timeoutMillis) throws MQClientException, RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException, - InterruptedException { + long timeoutMillis) throws MQClientException, RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException, + InterruptedException { QueryCorrectionOffsetHeader requestHeader = new QueryCorrectionOffsetHeader(); requestHeader.setCompareGroup(group); requestHeader.setTopic(topic); @@ -1703,7 +1734,7 @@ public class MQClientAPIImpl { RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.QUERY_CORRECTION_OFFSET, requestHeader); RemotingCommand response = this.remotingClient.invokeSync(MixAll.brokerVIPChannel(this.clientConfig.isVipChannelEnabled(), addr), - request, timeoutMillis); + request, timeoutMillis); assert response != null; switch (response.getCode()) { case ResponseCode.SUCCESS: { @@ -1720,7 +1751,7 @@ public class MQClientAPIImpl { } public TopicList getUnitTopicList(final boolean containRetry, final long timeoutMillis) - throws RemotingException, MQClientException, InterruptedException { + throws RemotingException, MQClientException, InterruptedException { RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.GET_UNIT_TOPIC_LIST, null); RemotingCommand response = this.remotingClient.invokeSync(null, request, timeoutMillis); @@ -1749,9 +1780,8 @@ public class MQClientAPIImpl { throw new MQClientException(response.getCode(), response.getRemark()); } - public TopicList getHasUnitSubTopicList(final boolean containRetry, final long timeoutMillis) - throws RemotingException, MQClientException, InterruptedException { + throws RemotingException, MQClientException, InterruptedException { RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.GET_HAS_UNIT_SUB_TOPIC_LIST, null); RemotingCommand response = this.remotingClient.invokeSync(null, request, timeoutMillis); @@ -1779,9 +1809,8 @@ public class MQClientAPIImpl { throw new MQClientException(response.getCode(), response.getRemark()); } - public TopicList getHasUnitSubUnUnitTopicList(final boolean containRetry, final long timeoutMillis) - throws RemotingException, MQClientException, InterruptedException { + throws RemotingException, MQClientException, InterruptedException { RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.GET_HAS_UNIT_SUB_UNUNIT_TOPIC_LIST, null); RemotingCommand response = this.remotingClient.invokeSync(null, request, timeoutMillis); @@ -1809,9 +1838,8 @@ public class MQClientAPIImpl { throw new MQClientException(response.getCode(), response.getRemark()); } - public void cloneGroupOffset(final String addr, final String srcGroup, final String destGroup, final String topic, - final boolean isOffline, final long timeoutMillis) throws RemotingException, MQClientException, InterruptedException { + final boolean isOffline, final long timeoutMillis) throws RemotingException, MQClientException, InterruptedException { CloneGroupOffsetRequestHeader requestHeader = new CloneGroupOffsetRequestHeader(); requestHeader.setSrcGroup(srcGroup); requestHeader.setDestGroup(destGroup); @@ -1820,7 +1848,7 @@ public class MQClientAPIImpl { RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.CLONE_GROUP_OFFSET, requestHeader); RemotingCommand response = this.remotingClient.invokeSync(MixAll.brokerVIPChannel(this.clientConfig.isVipChannelEnabled(), addr), - request, timeoutMillis); + request, timeoutMillis); assert response != null; switch (response.getCode()) { case ResponseCode.SUCCESS: { @@ -1833,10 +1861,9 @@ public class MQClientAPIImpl { throw new MQClientException(response.getCode(), response.getRemark()); } - public BrokerStatsData viewBrokerStatsData(String brokerAddr, String statsName, String statsKey, long timeoutMillis) - throws MQClientException, RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException, - InterruptedException { + throws MQClientException, RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException, + InterruptedException { ViewBrokerStatsDataRequestHeader requestHeader = new ViewBrokerStatsDataRequestHeader(); requestHeader.setStatsName(statsName); requestHeader.setStatsKey(statsKey); @@ -1844,7 +1871,7 @@ public class MQClientAPIImpl { RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.VIEW_BROKER_STATS_DATA, requestHeader); RemotingCommand response = this.remotingClient - .invokeSync(MixAll.brokerVIPChannel(this.clientConfig.isVipChannelEnabled(), brokerAddr), request, timeoutMillis); + .invokeSync(MixAll.brokerVIPChannel(this.clientConfig.isVipChannelEnabled(), brokerAddr), request, timeoutMillis); assert response != null; switch (response.getCode()) { case ResponseCode.SUCCESS: { @@ -1860,23 +1887,21 @@ public class MQClientAPIImpl { throw new MQClientException(response.getCode(), response.getRemark()); } - public Set<String> getClusterList(String topic, long timeoutMillis) throws MQClientException, RemotingConnectException, - RemotingSendRequestException, RemotingTimeoutException, InterruptedException { + RemotingSendRequestException, RemotingTimeoutException, InterruptedException { // todo:jodie return Collections.EMPTY_SET; } - public ConsumeStatsList fetchConsumeStatsInBroker(String brokerAddr, boolean isOrder, long timeoutMillis) throws MQClientException, - RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException, InterruptedException { + RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException, InterruptedException { GetConsumeStatsInBrokerHeader requestHeader = new GetConsumeStatsInBrokerHeader(); requestHeader.setIsOrder(isOrder); RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.GET_BROKER_CONSUME_STATS, requestHeader); RemotingCommand response = this.remotingClient - .invokeSync(MixAll.brokerVIPChannel(this.clientConfig.isVipChannelEnabled(), brokerAddr), request, timeoutMillis); + .invokeSync(MixAll.brokerVIPChannel(this.clientConfig.isVipChannelEnabled(), brokerAddr), request, timeoutMillis); assert response != null; switch (response.getCode()) { case ResponseCode.SUCCESS: { @@ -1892,12 +1917,11 @@ public class MQClientAPIImpl { throw new MQClientException(response.getCode(), response.getRemark()); } - public SubscriptionGroupWrapper getAllSubscriptionGroup(final String brokerAddr, long timeoutMillis) throws InterruptedException, - RemotingTimeoutException, RemotingSendRequestException, RemotingConnectException, MQBrokerException { + RemotingTimeoutException, RemotingSendRequestException, RemotingConnectException, MQBrokerException { RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.GET_ALL_SUBSCRIPTIONGROUP_CONFIG, null); RemotingCommand response = this.remotingClient - .invokeSync(MixAll.brokerVIPChannel(this.clientConfig.isVipChannelEnabled(), brokerAddr), request, timeoutMillis); + .invokeSync(MixAll.brokerVIPChannel(this.clientConfig.isVipChannelEnabled(), brokerAddr), request, timeoutMillis); assert response != null; switch (response.getCode()) { case ResponseCode.SUCCESS: { @@ -1909,13 +1933,12 @@ public class MQClientAPIImpl { throw new MQBrokerException(response.getCode(), response.getRemark()); } -
<TRUNCATED>
