http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/388ba7a5/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java
----------------------------------------------------------------------
diff --git 
a/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java 
b/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java
index 84ee7db..2dd9200 100644
--- a/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java
+++ b/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java
@@ -16,6 +16,17 @@
  */
 package org.apache.rocketmq.client.impl;
 
+import java.io.UnsupportedEncodingException;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicInteger;
 import org.apache.rocketmq.client.ClientConfig;
 import org.apache.rocketmq.client.consumer.PullCallback;
 import org.apache.rocketmq.client.consumer.PullResult;
@@ -37,21 +48,97 @@ import org.apache.rocketmq.common.TopicConfig;
 import org.apache.rocketmq.common.UtilAll;
 import org.apache.rocketmq.common.admin.ConsumeStats;
 import org.apache.rocketmq.common.admin.TopicStatsTable;
-import org.apache.rocketmq.common.message.*;
+import org.apache.rocketmq.common.message.Message;
+import org.apache.rocketmq.common.message.MessageClientIDSetter;
+import org.apache.rocketmq.common.message.MessageConst;
+import org.apache.rocketmq.common.message.MessageDecoder;
+import org.apache.rocketmq.common.message.MessageExt;
+import org.apache.rocketmq.common.message.MessageQueue;
 import org.apache.rocketmq.common.namesrv.TopAddressing;
 import org.apache.rocketmq.common.protocol.RequestCode;
 import org.apache.rocketmq.common.protocol.ResponseCode;
-import org.apache.rocketmq.common.protocol.body.*;
-import org.apache.rocketmq.common.protocol.header.*;
+import org.apache.rocketmq.common.protocol.body.BrokerStatsData;
+import org.apache.rocketmq.common.protocol.body.ClusterInfo;
+import org.apache.rocketmq.common.protocol.body.ConsumeMessageDirectlyResult;
+import org.apache.rocketmq.common.protocol.body.ConsumeStatsList;
+import org.apache.rocketmq.common.protocol.body.ConsumerConnection;
+import org.apache.rocketmq.common.protocol.body.ConsumerRunningInfo;
+import org.apache.rocketmq.common.protocol.body.GetConsumerStatusBody;
+import org.apache.rocketmq.common.protocol.body.GroupList;
+import org.apache.rocketmq.common.protocol.body.KVTable;
+import org.apache.rocketmq.common.protocol.body.LockBatchRequestBody;
+import org.apache.rocketmq.common.protocol.body.LockBatchResponseBody;
+import org.apache.rocketmq.common.protocol.body.ProducerConnection;
+import org.apache.rocketmq.common.protocol.body.QueryConsumeTimeSpanBody;
+import org.apache.rocketmq.common.protocol.body.QueryCorrectionOffsetBody;
+import org.apache.rocketmq.common.protocol.body.QueueTimeSpan;
+import org.apache.rocketmq.common.protocol.body.ResetOffsetBody;
+import org.apache.rocketmq.common.protocol.body.SubscriptionGroupWrapper;
+import org.apache.rocketmq.common.protocol.body.TopicConfigSerializeWrapper;
+import org.apache.rocketmq.common.protocol.body.TopicList;
+import org.apache.rocketmq.common.protocol.body.UnlockBatchRequestBody;
+import 
org.apache.rocketmq.common.protocol.header.CloneGroupOffsetRequestHeader;
+import 
org.apache.rocketmq.common.protocol.header.ConsumeMessageDirectlyResultRequestHeader;
+import 
org.apache.rocketmq.common.protocol.header.ConsumerSendMsgBackRequestHeader;
+import org.apache.rocketmq.common.protocol.header.CreateTopicRequestHeader;
+import 
org.apache.rocketmq.common.protocol.header.DeleteSubscriptionGroupRequestHeader;
+import org.apache.rocketmq.common.protocol.header.DeleteTopicRequestHeader;
+import org.apache.rocketmq.common.protocol.header.EndTransactionRequestHeader;
+import 
org.apache.rocketmq.common.protocol.header.GetConsumeStatsInBrokerHeader;
+import org.apache.rocketmq.common.protocol.header.GetConsumeStatsRequestHeader;
+import 
org.apache.rocketmq.common.protocol.header.GetConsumerConnectionListRequestHeader;
+import 
org.apache.rocketmq.common.protocol.header.GetConsumerListByGroupRequestHeader;
+import 
org.apache.rocketmq.common.protocol.header.GetConsumerListByGroupResponseBody;
+import 
org.apache.rocketmq.common.protocol.header.GetConsumerRunningInfoRequestHeader;
+import 
org.apache.rocketmq.common.protocol.header.GetConsumerStatusRequestHeader;
+import 
org.apache.rocketmq.common.protocol.header.GetEarliestMsgStoretimeRequestHeader;
+import 
org.apache.rocketmq.common.protocol.header.GetEarliestMsgStoretimeResponseHeader;
+import org.apache.rocketmq.common.protocol.header.GetMaxOffsetRequestHeader;
+import org.apache.rocketmq.common.protocol.header.GetMaxOffsetResponseHeader;
+import org.apache.rocketmq.common.protocol.header.GetMinOffsetRequestHeader;
+import org.apache.rocketmq.common.protocol.header.GetMinOffsetResponseHeader;
+import 
org.apache.rocketmq.common.protocol.header.GetProducerConnectionListRequestHeader;
+import 
org.apache.rocketmq.common.protocol.header.GetTopicStatsInfoRequestHeader;
+import 
org.apache.rocketmq.common.protocol.header.GetTopicsByClusterRequestHeader;
+import org.apache.rocketmq.common.protocol.header.PullMessageRequestHeader;
+import org.apache.rocketmq.common.protocol.header.PullMessageResponseHeader;
+import 
org.apache.rocketmq.common.protocol.header.QueryConsumeTimeSpanRequestHeader;
+import 
org.apache.rocketmq.common.protocol.header.QueryConsumerOffsetRequestHeader;
+import 
org.apache.rocketmq.common.protocol.header.QueryConsumerOffsetResponseHeader;
+import org.apache.rocketmq.common.protocol.header.QueryCorrectionOffsetHeader;
+import org.apache.rocketmq.common.protocol.header.QueryMessageRequestHeader;
+import 
org.apache.rocketmq.common.protocol.header.QueryTopicConsumeByWhoRequestHeader;
+import org.apache.rocketmq.common.protocol.header.ResetOffsetRequestHeader;
+import org.apache.rocketmq.common.protocol.header.SearchOffsetRequestHeader;
+import org.apache.rocketmq.common.protocol.header.SearchOffsetResponseHeader;
+import org.apache.rocketmq.common.protocol.header.SendMessageRequestHeader;
+import org.apache.rocketmq.common.protocol.header.SendMessageRequestHeaderV2;
+import org.apache.rocketmq.common.protocol.header.SendMessageResponseHeader;
+import 
org.apache.rocketmq.common.protocol.header.UnregisterClientRequestHeader;
+import 
org.apache.rocketmq.common.protocol.header.UpdateConsumerOffsetRequestHeader;
+import 
org.apache.rocketmq.common.protocol.header.ViewBrokerStatsDataRequestHeader;
+import org.apache.rocketmq.common.protocol.header.ViewMessageRequestHeader;
 import 
org.apache.rocketmq.common.protocol.header.filtersrv.RegisterMessageFilterClassRequestHeader;
-import org.apache.rocketmq.common.protocol.header.namesrv.*;
+import 
org.apache.rocketmq.common.protocol.header.namesrv.DeleteKVConfigRequestHeader;
+import 
org.apache.rocketmq.common.protocol.header.namesrv.GetKVConfigRequestHeader;
+import 
org.apache.rocketmq.common.protocol.header.namesrv.GetKVConfigResponseHeader;
+import 
org.apache.rocketmq.common.protocol.header.namesrv.GetKVListByNamespaceRequestHeader;
+import 
org.apache.rocketmq.common.protocol.header.namesrv.GetRouteInfoRequestHeader;
+import 
org.apache.rocketmq.common.protocol.header.namesrv.PutKVConfigRequestHeader;
+import 
org.apache.rocketmq.common.protocol.header.namesrv.WipeWritePermOfBrokerRequestHeader;
+import 
org.apache.rocketmq.common.protocol.header.namesrv.WipeWritePermOfBrokerResponseHeader;
 import org.apache.rocketmq.common.protocol.heartbeat.HeartbeatData;
 import org.apache.rocketmq.common.protocol.route.TopicRouteData;
 import org.apache.rocketmq.common.subscription.SubscriptionGroupConfig;
 import org.apache.rocketmq.remoting.InvokeCallback;
 import org.apache.rocketmq.remoting.RPCHook;
 import org.apache.rocketmq.remoting.RemotingClient;
-import org.apache.rocketmq.remoting.exception.*;
+import org.apache.rocketmq.remoting.exception.RemotingCommandException;
+import org.apache.rocketmq.remoting.exception.RemotingConnectException;
+import org.apache.rocketmq.remoting.exception.RemotingException;
+import org.apache.rocketmq.remoting.exception.RemotingSendRequestException;
+import org.apache.rocketmq.remoting.exception.RemotingTimeoutException;
+import org.apache.rocketmq.remoting.exception.RemotingTooMuchRequestException;
 import org.apache.rocketmq.remoting.netty.NettyClientConfig;
 import org.apache.rocketmq.remoting.netty.NettyRemotingClient;
 import org.apache.rocketmq.remoting.netty.ResponseFuture;
@@ -60,17 +147,11 @@ import 
org.apache.rocketmq.remoting.protocol.RemotingCommand;
 import org.apache.rocketmq.remoting.protocol.RemotingSerializable;
 import org.slf4j.Logger;
 
-import java.io.UnsupportedEncodingException;
-import java.nio.ByteBuffer;
-import java.util.*;
-import java.util.concurrent.atomic.AtomicInteger;
-
-
 public class MQClientAPIImpl {
 
     private final static Logger log = ClientLogger.getLog();
     public static boolean sendSmartMsg =
-            
Boolean.parseBoolean(System.getProperty("org.apache.rocketmq.client.sendSmartMsg",
 "true"));
+        
Boolean.parseBoolean(System.getProperty("org.apache.rocketmq.client.sendSmartMsg",
 "true"));
 
     static {
         System.setProperty(RemotingCommand.REMOTING_VERSION_KEY, 
Integer.toString(MQVersion.CURRENT_VERSION));
@@ -83,7 +164,7 @@ public class MQClientAPIImpl {
     private ClientConfig clientConfig;
 
     public MQClientAPIImpl(final NettyClientConfig nettyClientConfig, final 
ClientRemotingProcessor clientRemotingProcessor,
-                           RPCHook rpcHook, final ClientConfig clientConfig) {
+        RPCHook rpcHook, final ClientConfig clientConfig) {
         this.clientConfig = clientConfig;
         topAddressing = new TopAddressing(MixAll.WS_ADDR, 
clientConfig.getUnitName());
         this.remotingClient = new NettyRemotingClient(nettyClientConfig, null);
@@ -149,14 +230,14 @@ public class MQClientAPIImpl {
     }
 
     public void createSubscriptionGroup(final String addr, final 
SubscriptionGroupConfig config, final long timeoutMillis)
-            throws RemotingException, MQBrokerException, InterruptedException, 
MQClientException {
+        throws RemotingException, MQBrokerException, InterruptedException, 
MQClientException {
         RemotingCommand request = 
RemotingCommand.createRequestCommand(RequestCode.UPDATE_AND_CREATE_SUBSCRIPTIONGROUP,
 null);
 
         byte[] body = RemotingSerializable.encode(config);
         request.setBody(body);
 
         RemotingCommand response = 
this.remotingClient.invokeSync(MixAll.brokerVIPChannel(this.clientConfig.isVipChannelEnabled(),
 addr),
-                request, timeoutMillis);
+            request, timeoutMillis);
         assert response != null;
         switch (response.getCode()) {
             case ResponseCode.SUCCESS: {
@@ -171,7 +252,7 @@ public class MQClientAPIImpl {
     }
 
     public void createTopic(final String addr, final String defaultTopic, 
final TopicConfig topicConfig, final long timeoutMillis)
-            throws RemotingException, MQBrokerException, InterruptedException, 
MQClientException {
+        throws RemotingException, MQBrokerException, InterruptedException, 
MQClientException {
         CreateTopicRequestHeader requestHeader = new 
CreateTopicRequestHeader();
         requestHeader.setTopic(topicConfig.getTopicName());
         requestHeader.setDefaultTopic(defaultTopic);
@@ -185,7 +266,7 @@ public class MQClientAPIImpl {
         RemotingCommand request = 
RemotingCommand.createRequestCommand(RequestCode.UPDATE_AND_CREATE_TOPIC, 
requestHeader);
 
         RemotingCommand response = 
this.remotingClient.invokeSync(MixAll.brokerVIPChannel(this.clientConfig.isVipChannelEnabled(),
 addr),
-                request, timeoutMillis);
+            request, timeoutMillis);
         assert response != null;
         switch (response.getCode()) {
             case ResponseCode.SUCCESS: {
@@ -199,31 +280,31 @@ public class MQClientAPIImpl {
     }
 
     public SendResult sendMessage(//
-                                  final String addr, // 1
-                                  final String brokerName, // 2
-                                  final Message msg, // 3
-                                  final SendMessageRequestHeader 
requestHeader, // 4
-                                  final long timeoutMillis, // 5
-                                  final CommunicationMode communicationMode, 
// 6
-                                  final SendMessageContext context, // 7
-                                  final DefaultMQProducerImpl producer // 8
+        final String addr, // 1
+        final String brokerName, // 2
+        final Message msg, // 3
+        final SendMessageRequestHeader requestHeader, // 4
+        final long timeoutMillis, // 5
+        final CommunicationMode communicationMode, // 6
+        final SendMessageContext context, // 7
+        final DefaultMQProducerImpl producer // 8
     ) throws RemotingException, MQBrokerException, InterruptedException {
         return sendMessage(addr, brokerName, msg, requestHeader, 
timeoutMillis, communicationMode, null, null, null, 0, context, producer);
     }
 
     public SendResult sendMessage(//
-                                  final String addr, // 1
-                                  final String brokerName, // 2
-                                  final Message msg, // 3
-                                  final SendMessageRequestHeader 
requestHeader, // 4
-                                  final long timeoutMillis, // 5
-                                  final CommunicationMode communicationMode, 
// 6
-                                  final SendCallback sendCallback, // 7
-                                  final TopicPublishInfo topicPublishInfo, // 8
-                                  final MQClientInstance instance, // 9
-                                  final int retryTimesWhenSendFailed, // 10
-                                  final SendMessageContext context, // 11
-                                  final DefaultMQProducerImpl producer // 12
+        final String addr, // 1
+        final String brokerName, // 2
+        final Message msg, // 3
+        final SendMessageRequestHeader requestHeader, // 4
+        final long timeoutMillis, // 5
+        final CommunicationMode communicationMode, // 6
+        final SendCallback sendCallback, // 7
+        final TopicPublishInfo topicPublishInfo, // 8
+        final MQClientInstance instance, // 9
+        final int retryTimesWhenSendFailed, // 10
+        final SendMessageContext context, // 11
+        final DefaultMQProducerImpl producer // 12
     ) throws RemotingException, MQBrokerException, InterruptedException {
         RemotingCommand request = null;
         if (sendSmartMsg) {
@@ -242,7 +323,7 @@ public class MQClientAPIImpl {
             case ASYNC:
                 final AtomicInteger times = new AtomicInteger();
                 this.sendMessageAsync(addr, brokerName, msg, timeoutMillis, 
request, sendCallback, topicPublishInfo, instance,
-                        retryTimesWhenSendFailed, times, context, producer);
+                    retryTimesWhenSendFailed, times, context, producer);
                 return null;
             case SYNC:
                 return this.sendMessageSync(addr, brokerName, msg, 
timeoutMillis, request);
@@ -255,11 +336,11 @@ public class MQClientAPIImpl {
     }
 
     private SendResult sendMessageSync(//
-                                       final String addr, //
-                                       final String brokerName, //
-                                       final Message msg, //
-                                       final long timeoutMillis, //
-                                       final RemotingCommand request//
+        final String addr, //
+        final String brokerName, //
+        final Message msg, //
+        final long timeoutMillis, //
+        final RemotingCommand request//
     ) throws RemotingException, MQBrokerException, InterruptedException {
         RemotingCommand response = this.remotingClient.invokeSync(addr, 
request, timeoutMillis);
         assert response != null;
@@ -267,18 +348,18 @@ public class MQClientAPIImpl {
     }
 
     private void sendMessageAsync(//
-                                  final String addr, //
-                                  final String brokerName, //
-                                  final Message msg, //
-                                  final long timeoutMillis, //
-                                  final RemotingCommand request, //
-                                  final SendCallback sendCallback, //
-                                  final TopicPublishInfo topicPublishInfo, //
-                                  final MQClientInstance instance, //
-                                  final int retryTimesWhenSendFailed, //
-                                  final AtomicInteger times, //
-                                  final SendMessageContext context, //
-                                  final DefaultMQProducerImpl producer //
+        final String addr, //
+        final String brokerName, //
+        final Message msg, //
+        final long timeoutMillis, //
+        final RemotingCommand request, //
+        final SendCallback sendCallback, //
+        final TopicPublishInfo topicPublishInfo, //
+        final MQClientInstance instance, //
+        final int retryTimesWhenSendFailed, //
+        final AtomicInteger times, //
+        final SendMessageContext context, //
+        final DefaultMQProducerImpl producer //
     ) throws InterruptedException, RemotingException {
         this.remotingClient.invokeAsync(addr, request, timeoutMillis, new 
InvokeCallback() {
             @Override
@@ -318,68 +399,67 @@ public class MQClientAPIImpl {
                     } catch (Exception e) {
                         producer.updateFaultItem(brokerName, 
System.currentTimeMillis() - responseFuture.getBeginTimestamp(), true);
                         onExceptionImpl(brokerName, msg, 0L, request, 
sendCallback, topicPublishInfo, instance,
-                                retryTimesWhenSendFailed, times, e, context, 
false, producer);
+                            retryTimesWhenSendFailed, times, e, context, 
false, producer);
                     }
                 } else {
                     producer.updateFaultItem(brokerName, 
System.currentTimeMillis() - responseFuture.getBeginTimestamp(), true);
                     if (!responseFuture.isSendRequestOK()) {
                         MQClientException ex = new MQClientException("send 
request failed", responseFuture.getCause());
                         onExceptionImpl(brokerName, msg, 0L, request, 
sendCallback, topicPublishInfo, instance,
-                                retryTimesWhenSendFailed, times, ex, context, 
true, producer);
+                            retryTimesWhenSendFailed, times, ex, context, 
true, producer);
                     } else if (responseFuture.isTimeout()) {
                         MQClientException ex = new MQClientException("wait 
response timeout " + responseFuture.getTimeoutMillis() + "ms",
-                                responseFuture.getCause());
+                            responseFuture.getCause());
                         onExceptionImpl(brokerName, msg, 0L, request, 
sendCallback, topicPublishInfo, instance,
-                                retryTimesWhenSendFailed, times, ex, context, 
true, producer);
+                            retryTimesWhenSendFailed, times, ex, context, 
true, producer);
                     } else {
                         MQClientException ex = new MQClientException("unknow 
reseaon", responseFuture.getCause());
                         onExceptionImpl(brokerName, msg, 0L, request, 
sendCallback, topicPublishInfo, instance,
-                                retryTimesWhenSendFailed, times, ex, context, 
true, producer);
+                            retryTimesWhenSendFailed, times, ex, context, 
true, producer);
                     }
                 }
             }
         });
     }
 
-
     private void onExceptionImpl(final String brokerName, //
-                                 final Message msg, //
-                                 final long timeoutMillis, //
-                                 final RemotingCommand request, //
-                                 final SendCallback sendCallback, //
-                                 final TopicPublishInfo topicPublishInfo, //
-                                 final MQClientInstance instance, //
-                                 final int timesTotal, //
-                                 final AtomicInteger curTimes, //
-                                 final Exception e, //
-                                 final SendMessageContext context, //
-                                 final boolean needRetry, //
-                                 final DefaultMQProducerImpl producer // 12
+        final Message msg, //
+        final long timeoutMillis, //
+        final RemotingCommand request, //
+        final SendCallback sendCallback, //
+        final TopicPublishInfo topicPublishInfo, //
+        final MQClientInstance instance, //
+        final int timesTotal, //
+        final AtomicInteger curTimes, //
+        final Exception e, //
+        final SendMessageContext context, //
+        final boolean needRetry, //
+        final DefaultMQProducerImpl producer // 12
     ) {
         int tmp = curTimes.incrementAndGet();
         if (needRetry && tmp <= timesTotal) {
             MessageQueue tmpmq = 
producer.selectOneMessageQueue(topicPublishInfo, brokerName);
             String addr = 
instance.findBrokerAddressInPublish(tmpmq.getBrokerName());
             log.info("async send msg by retry {} times. topic={}, 
brokerAddr={}, brokerName={}", tmp, msg.getTopic(), addr,
-                    tmpmq.getBrokerName());
+                tmpmq.getBrokerName());
             try {
                 request.setOpaque(RemotingCommand.createNewRequestId());
                 sendMessageAsync(addr, tmpmq.getBrokerName(), msg, 
timeoutMillis, request, sendCallback, topicPublishInfo, instance,
-                        timesTotal, curTimes, context, producer);
+                    timesTotal, curTimes, context, producer);
             } catch (InterruptedException e1) {
                 onExceptionImpl(tmpmq.getBrokerName(), msg, timeoutMillis, 
request, sendCallback, topicPublishInfo, instance, timesTotal, curTimes, e1,
-                        context, false, producer);
+                    context, false, producer);
             } catch (RemotingConnectException e1) {
                 producer.updateFaultItem(brokerName, 3000, true);
                 onExceptionImpl(tmpmq.getBrokerName(), msg, timeoutMillis, 
request, sendCallback, topicPublishInfo, instance, timesTotal, curTimes, e1,
-                        context, true, producer);
+                    context, true, producer);
             } catch (RemotingTooMuchRequestException e1) {
                 onExceptionImpl(tmpmq.getBrokerName(), msg, timeoutMillis, 
request, sendCallback, topicPublishInfo, instance, timesTotal, curTimes, e1,
-                        context, false, producer);
+                    context, false, producer);
             } catch (RemotingException e1) {
                 producer.updateFaultItem(brokerName, 3000, true);
                 onExceptionImpl(tmpmq.getBrokerName(), msg, timeoutMillis, 
request, sendCallback, topicPublishInfo, instance, timesTotal, curTimes, e1,
-                        context, true, producer);
+                    context, true, producer);
             }
         } else {
             if (context != null) {
@@ -393,11 +473,10 @@ public class MQClientAPIImpl {
         }
     }
 
-
     private SendResult processSendResponse(//
-                                           final String brokerName, //
-                                           final Message msg, //
-                                           final RemotingCommand response//
+        final String brokerName, //
+        final Message msg, //
+        final RemotingCommand response//
     ) throws MQBrokerException, RemotingCommandException {
         switch (response.getCode()) {
             case ResponseCode.FLUSH_DISK_TIMEOUT:
@@ -426,13 +505,13 @@ public class MQClientAPIImpl {
                 }
 
                 SendMessageResponseHeader responseHeader =
-                        (SendMessageResponseHeader) 
response.decodeCommandCustomHeader(SendMessageResponseHeader.class);
+                    
(SendMessageResponseHeader)response.decodeCommandCustomHeader(SendMessageResponseHeader.class);
 
                 MessageQueue messageQueue = new MessageQueue(msg.getTopic(), 
brokerName, responseHeader.getQueueId());
 
                 SendResult sendResult = new SendResult(sendStatus,
-                        MessageClientIDSetter.getUniqID(msg),
-                        responseHeader.getMsgId(), messageQueue, 
responseHeader.getQueueOffset());
+                    MessageClientIDSetter.getUniqID(msg),
+                    responseHeader.getMsgId(), messageQueue, 
responseHeader.getQueueOffset());
                 sendResult.setTransactionId(responseHeader.getTransactionId());
                 String regionId = 
response.getExtFields().get(MessageConst.PROPERTY_MSG_REGION);
                 String traceOn = 
response.getExtFields().get(MessageConst.PROPERTY_TRACE_SWITCH);
@@ -454,13 +533,12 @@ public class MQClientAPIImpl {
         throw new MQBrokerException(response.getCode(), response.getRemark());
     }
 
-
     public PullResult pullMessage(//
-                                  final String addr, //
-                                  final PullMessageRequestHeader 
requestHeader, //
-                                  final long timeoutMillis, //
-                                  final CommunicationMode communicationMode, //
-                                  final PullCallback pullCallback//
+        final String addr, //
+        final PullMessageRequestHeader requestHeader, //
+        final long timeoutMillis, //
+        final CommunicationMode communicationMode, //
+        final PullCallback pullCallback//
     ) throws RemotingException, MQBrokerException, InterruptedException {
         RemotingCommand request = 
RemotingCommand.createRequestCommand(RequestCode.PULL_MESSAGE, requestHeader);
 
@@ -481,12 +559,11 @@ public class MQClientAPIImpl {
         return null;
     }
 
-
     private void pullMessageAsync(//
-                                  final String addr, // 1
-                                  final RemotingCommand request, //
-                                  final long timeoutMillis, //
-                                  final PullCallback pullCallback//
+        final String addr, // 1
+        final RemotingCommand request, //
+        final long timeoutMillis, //
+        final PullCallback pullCallback//
     ) throws RemotingException, InterruptedException {
         this.remotingClient.invokeAsync(addr, request, timeoutMillis, new 
InvokeCallback() {
             @Override
@@ -505,7 +582,7 @@ public class MQClientAPIImpl {
                         pullCallback.onException(new MQClientException("send 
request failed", responseFuture.getCause()));
                     } else if (responseFuture.isTimeout()) {
                         pullCallback.onException(new MQClientException("wait 
response timeout " + responseFuture.getTimeoutMillis() + "ms",
-                                responseFuture.getCause()));
+                            responseFuture.getCause()));
                     } else {
                         pullCallback.onException(new MQClientException("unknow 
reseaon", responseFuture.getCause()));
                     }
@@ -515,9 +592,9 @@ public class MQClientAPIImpl {
     }
 
     private PullResult pullMessageSync(//
-                                       final String addr, // 1
-                                       final RemotingCommand request, // 2
-                                       final long timeoutMillis// 3
+        final String addr, // 1
+        final RemotingCommand request, // 2
+        final long timeoutMillis// 3
     ) throws RemotingException, InterruptedException, MQBrokerException {
         RemotingCommand response = this.remotingClient.invokeSync(addr, 
request, timeoutMillis);
         assert response != null;
@@ -545,20 +622,20 @@ public class MQClientAPIImpl {
         }
 
         PullMessageResponseHeader responseHeader =
-                (PullMessageResponseHeader) 
response.decodeCommandCustomHeader(PullMessageResponseHeader.class);
+            
(PullMessageResponseHeader)response.decodeCommandCustomHeader(PullMessageResponseHeader.class);
 
         return new PullResultExt(pullStatus, 
responseHeader.getNextBeginOffset(), responseHeader.getMinOffset(),
-                responseHeader.getMaxOffset(), null, 
responseHeader.getSuggestWhichBrokerId(), response.getBody());
+            responseHeader.getMaxOffset(), null, 
responseHeader.getSuggestWhichBrokerId(), response.getBody());
     }
 
     public MessageExt viewMessage(final String addr, final long phyoffset, 
final long timeoutMillis)
-            throws RemotingException, MQBrokerException, InterruptedException {
+        throws RemotingException, MQBrokerException, InterruptedException {
         ViewMessageRequestHeader requestHeader = new 
ViewMessageRequestHeader();
         requestHeader.setOffset(phyoffset);
         RemotingCommand request = 
RemotingCommand.createRequestCommand(RequestCode.VIEW_MESSAGE_BY_ID, 
requestHeader);
 
         RemotingCommand response = 
this.remotingClient.invokeSync(MixAll.brokerVIPChannel(this.clientConfig.isVipChannelEnabled(),
 addr),
-                request, timeoutMillis);
+            request, timeoutMillis);
         assert response != null;
         switch (response.getCode()) {
             case ResponseCode.SUCCESS: {
@@ -573,9 +650,8 @@ public class MQClientAPIImpl {
         throw new MQBrokerException(response.getCode(), response.getRemark());
     }
 
-
     public long searchOffset(final String addr, final String topic, final int 
queueId, final long timestamp, final long timeoutMillis)
-            throws RemotingException, MQBrokerException, InterruptedException {
+        throws RemotingException, MQBrokerException, InterruptedException {
         SearchOffsetRequestHeader requestHeader = new 
SearchOffsetRequestHeader();
         requestHeader.setTopic(topic);
         requestHeader.setQueueId(queueId);
@@ -583,12 +659,12 @@ public class MQClientAPIImpl {
         RemotingCommand request = 
RemotingCommand.createRequestCommand(RequestCode.SEARCH_OFFSET_BY_TIMESTAMP, 
requestHeader);
 
         RemotingCommand response = 
this.remotingClient.invokeSync(MixAll.brokerVIPChannel(this.clientConfig.isVipChannelEnabled(),
 addr),
-                request, timeoutMillis);
+            request, timeoutMillis);
         assert response != null;
         switch (response.getCode()) {
             case ResponseCode.SUCCESS: {
                 SearchOffsetResponseHeader responseHeader =
-                        (SearchOffsetResponseHeader) 
response.decodeCommandCustomHeader(SearchOffsetResponseHeader.class);
+                    
(SearchOffsetResponseHeader)response.decodeCommandCustomHeader(SearchOffsetResponseHeader.class);
                 return responseHeader.getOffset();
             }
             default:
@@ -598,21 +674,20 @@ public class MQClientAPIImpl {
         throw new MQBrokerException(response.getCode(), response.getRemark());
     }
 
-
     public long getMaxOffset(final String addr, final String topic, final int 
queueId, final long timeoutMillis)
-            throws RemotingException, MQBrokerException, InterruptedException {
+        throws RemotingException, MQBrokerException, InterruptedException {
         GetMaxOffsetRequestHeader requestHeader = new 
GetMaxOffsetRequestHeader();
         requestHeader.setTopic(topic);
         requestHeader.setQueueId(queueId);
         RemotingCommand request = 
RemotingCommand.createRequestCommand(RequestCode.GET_MAX_OFFSET, requestHeader);
 
         RemotingCommand response = 
this.remotingClient.invokeSync(MixAll.brokerVIPChannel(this.clientConfig.isVipChannelEnabled(),
 addr),
-                request, timeoutMillis);
+            request, timeoutMillis);
         assert response != null;
         switch (response.getCode()) {
             case ResponseCode.SUCCESS: {
                 GetMaxOffsetResponseHeader responseHeader =
-                        (GetMaxOffsetResponseHeader) 
response.decodeCommandCustomHeader(GetMaxOffsetResponseHeader.class);
+                    
(GetMaxOffsetResponseHeader)response.decodeCommandCustomHeader(GetMaxOffsetResponseHeader.class);
 
                 return responseHeader.getOffset();
             }
@@ -623,24 +698,23 @@ public class MQClientAPIImpl {
         throw new MQBrokerException(response.getCode(), response.getRemark());
     }
 
-
     public List<String> getConsumerIdListByGroup(//
-                                                 final String addr, //
-                                                 final String consumerGroup, //
-                                                 final long timeoutMillis) 
throws RemotingConnectException, RemotingSendRequestException, 
RemotingTimeoutException,
-            MQBrokerException, InterruptedException {
+        final String addr, //
+        final String consumerGroup, //
+        final long timeoutMillis) throws RemotingConnectException, 
RemotingSendRequestException, RemotingTimeoutException,
+        MQBrokerException, InterruptedException {
         GetConsumerListByGroupRequestHeader requestHeader = new 
GetConsumerListByGroupRequestHeader();
         requestHeader.setConsumerGroup(consumerGroup);
         RemotingCommand request = 
RemotingCommand.createRequestCommand(RequestCode.GET_CONSUMER_LIST_BY_GROUP, 
requestHeader);
 
         RemotingCommand response = 
this.remotingClient.invokeSync(MixAll.brokerVIPChannel(this.clientConfig.isVipChannelEnabled(),
 addr),
-                request, timeoutMillis);
+            request, timeoutMillis);
         assert response != null;
         switch (response.getCode()) {
             case ResponseCode.SUCCESS: {
                 if (response.getBody() != null) {
                     GetConsumerListByGroupResponseBody body =
-                            
GetConsumerListByGroupResponseBody.decode(response.getBody(), 
GetConsumerListByGroupResponseBody.class);
+                        
GetConsumerListByGroupResponseBody.decode(response.getBody(), 
GetConsumerListByGroupResponseBody.class);
                     return body.getConsumerIdList();
                 }
             }
@@ -651,21 +725,20 @@ public class MQClientAPIImpl {
         throw new MQBrokerException(response.getCode(), response.getRemark());
     }
 
-
     public long getMinOffset(final String addr, final String topic, final int 
queueId, final long timeoutMillis)
-            throws RemotingException, MQBrokerException, InterruptedException {
+        throws RemotingException, MQBrokerException, InterruptedException {
         GetMinOffsetRequestHeader requestHeader = new 
GetMinOffsetRequestHeader();
         requestHeader.setTopic(topic);
         requestHeader.setQueueId(queueId);
         RemotingCommand request = 
RemotingCommand.createRequestCommand(RequestCode.GET_MIN_OFFSET, requestHeader);
 
         RemotingCommand response = 
this.remotingClient.invokeSync(MixAll.brokerVIPChannel(this.clientConfig.isVipChannelEnabled(),
 addr),
-                request, timeoutMillis);
+            request, timeoutMillis);
         assert response != null;
         switch (response.getCode()) {
             case ResponseCode.SUCCESS: {
                 GetMinOffsetResponseHeader responseHeader =
-                        (GetMinOffsetResponseHeader) 
response.decodeCommandCustomHeader(GetMinOffsetResponseHeader.class);
+                    
(GetMinOffsetResponseHeader)response.decodeCommandCustomHeader(GetMinOffsetResponseHeader.class);
 
                 return responseHeader.getOffset();
             }
@@ -676,21 +749,20 @@ public class MQClientAPIImpl {
         throw new MQBrokerException(response.getCode(), response.getRemark());
     }
 
-
     public long getEarliestMsgStoretime(final String addr, final String topic, 
final int queueId, final long timeoutMillis)
-            throws RemotingException, MQBrokerException, InterruptedException {
+        throws RemotingException, MQBrokerException, InterruptedException {
         GetEarliestMsgStoretimeRequestHeader requestHeader = new 
GetEarliestMsgStoretimeRequestHeader();
         requestHeader.setTopic(topic);
         requestHeader.setQueueId(queueId);
         RemotingCommand request = 
RemotingCommand.createRequestCommand(RequestCode.GET_EARLIEST_MSG_STORETIME, 
requestHeader);
 
         RemotingCommand response = 
this.remotingClient.invokeSync(MixAll.brokerVIPChannel(this.clientConfig.isVipChannelEnabled(),
 addr),
-                request, timeoutMillis);
+            request, timeoutMillis);
         assert response != null;
         switch (response.getCode()) {
             case ResponseCode.SUCCESS: {
                 GetEarliestMsgStoretimeResponseHeader responseHeader =
-                        (GetEarliestMsgStoretimeResponseHeader) 
response.decodeCommandCustomHeader(GetEarliestMsgStoretimeResponseHeader.class);
+                    
(GetEarliestMsgStoretimeResponseHeader)response.decodeCommandCustomHeader(GetEarliestMsgStoretimeResponseHeader.class);
 
                 return responseHeader.getTimestamp();
             }
@@ -701,21 +773,20 @@ public class MQClientAPIImpl {
         throw new MQBrokerException(response.getCode(), response.getRemark());
     }
 
-
     public long queryConsumerOffset(//
-                                    final String addr, //
-                                    final QueryConsumerOffsetRequestHeader 
requestHeader, //
-                                    final long timeoutMillis//
+        final String addr, //
+        final QueryConsumerOffsetRequestHeader requestHeader, //
+        final long timeoutMillis//
     ) throws RemotingException, MQBrokerException, InterruptedException {
         RemotingCommand request = 
RemotingCommand.createRequestCommand(RequestCode.QUERY_CONSUMER_OFFSET, 
requestHeader);
 
         RemotingCommand response = 
this.remotingClient.invokeSync(MixAll.brokerVIPChannel(this.clientConfig.isVipChannelEnabled(),
 addr),
-                request, timeoutMillis);
+            request, timeoutMillis);
         assert response != null;
         switch (response.getCode()) {
             case ResponseCode.SUCCESS: {
                 QueryConsumerOffsetResponseHeader responseHeader =
-                        (QueryConsumerOffsetResponseHeader) 
response.decodeCommandCustomHeader(QueryConsumerOffsetResponseHeader.class);
+                    
(QueryConsumerOffsetResponseHeader)response.decodeCommandCustomHeader(QueryConsumerOffsetResponseHeader.class);
 
                 return responseHeader.getOffset();
             }
@@ -726,16 +797,15 @@ public class MQClientAPIImpl {
         throw new MQBrokerException(response.getCode(), response.getRemark());
     }
 
-
     public void updateConsumerOffset(//
-                                     final String addr, //
-                                     final UpdateConsumerOffsetRequestHeader 
requestHeader, //
-                                     final long timeoutMillis//
+        final String addr, //
+        final UpdateConsumerOffsetRequestHeader requestHeader, //
+        final long timeoutMillis//
     ) throws RemotingException, MQBrokerException, InterruptedException {
         RemotingCommand request = 
RemotingCommand.createRequestCommand(RequestCode.UPDATE_CONSUMER_OFFSET, 
requestHeader);
 
         RemotingCommand response = 
this.remotingClient.invokeSync(MixAll.brokerVIPChannel(this.clientConfig.isVipChannelEnabled(),
 addr),
-                request, timeoutMillis);
+            request, timeoutMillis);
         assert response != null;
         switch (response.getCode()) {
             case ResponseCode.SUCCESS: {
@@ -748,23 +818,21 @@ public class MQClientAPIImpl {
         throw new MQBrokerException(response.getCode(), response.getRemark());
     }
 
-
     public void updateConsumerOffsetOneway(//
-                                           final String addr, //
-                                           final 
UpdateConsumerOffsetRequestHeader requestHeader, //
-                                           final long timeoutMillis//
+        final String addr, //
+        final UpdateConsumerOffsetRequestHeader requestHeader, //
+        final long timeoutMillis//
     ) throws RemotingConnectException, RemotingTooMuchRequestException, 
RemotingTimeoutException, RemotingSendRequestException,
-            InterruptedException {
+        InterruptedException {
         RemotingCommand request = 
RemotingCommand.createRequestCommand(RequestCode.UPDATE_CONSUMER_OFFSET, 
requestHeader);
 
         
this.remotingClient.invokeOneway(MixAll.brokerVIPChannel(this.clientConfig.isVipChannelEnabled(),
 addr), request, timeoutMillis);
     }
 
-
     public void sendHearbeat(//
-                             final String addr, //
-                             final HeartbeatData heartbeatData, //
-                             final long timeoutMillis//
+        final String addr, //
+        final HeartbeatData heartbeatData, //
+        final long timeoutMillis//
     ) throws RemotingException, MQBrokerException, InterruptedException {
         RemotingCommand request = 
RemotingCommand.createRequestCommand(RequestCode.HEART_BEAT, null);
 
@@ -782,13 +850,12 @@ public class MQClientAPIImpl {
         throw new MQBrokerException(response.getCode(), response.getRemark());
     }
 
-
     public void unregisterClient(//
-                                 final String addr, //
-                                 final String clientID, //
-                                 final String producerGroup, //
-                                 final String consumerGroup, //
-                                 final long timeoutMillis//
+        final String addr, //
+        final String clientID, //
+        final String producerGroup, //
+        final String consumerGroup, //
+        final long timeoutMillis//
     ) throws RemotingException, MQBrokerException, InterruptedException {
         final UnregisterClientRequestHeader requestHeader = new 
UnregisterClientRequestHeader();
         requestHeader.setClientID(clientID);
@@ -809,12 +876,11 @@ public class MQClientAPIImpl {
         throw new MQBrokerException(response.getCode(), response.getRemark());
     }
 
-
     public void endTransactionOneway(//
-                                     final String addr, //
-                                     final EndTransactionRequestHeader 
requestHeader, //
-                                     final String remark, //
-                                     final long timeoutMillis//
+        final String addr, //
+        final EndTransactionRequestHeader requestHeader, //
+        final String remark, //
+        final long timeoutMillis//
     ) throws RemotingException, MQBrokerException, InterruptedException {
         RemotingCommand request = 
RemotingCommand.createRequestCommand(RequestCode.END_TRANSACTION, 
requestHeader);
 
@@ -822,23 +888,21 @@ public class MQClientAPIImpl {
         this.remotingClient.invokeOneway(addr, request, timeoutMillis);
     }
 
-
     public void queryMessage(
-            final String addr,
-            final QueryMessageRequestHeader requestHeader,
-            final long timeoutMillis,
-            final InvokeCallback invokeCallback,
-            final Boolean isUnqiueKey
+        final String addr,
+        final QueryMessageRequestHeader requestHeader,
+        final long timeoutMillis,
+        final InvokeCallback invokeCallback,
+        final Boolean isUnqiueKey
     ) throws RemotingException, MQBrokerException, InterruptedException {
         RemotingCommand request = 
RemotingCommand.createRequestCommand(RequestCode.QUERY_MESSAGE, requestHeader);
         request.addExtField(MixAll.UNIQUE_MSG_QUERY_FLAG, 
isUnqiueKey.toString());
         
this.remotingClient.invokeAsync(MixAll.brokerVIPChannel(this.clientConfig.isVipChannelEnabled(),
 addr), request, timeoutMillis,
-                invokeCallback);
+            invokeCallback);
     }
 
-
     public boolean registerClient(final String addr, final HeartbeatData 
heartbeat, final long timeoutMillis)
-            throws RemotingException, InterruptedException {
+        throws RemotingException, InterruptedException {
         RemotingCommand request = 
RemotingCommand.createRequestCommand(RequestCode.HEART_BEAT, null);
 
         request.setBody(heartbeat.encode());
@@ -846,14 +910,13 @@ public class MQClientAPIImpl {
         return response.getCode() == ResponseCode.SUCCESS;
     }
 
-
     public void consumerSendMessageBack(
-            final String addr,
-            final MessageExt msg,
-            final String consumerGroup,
-            final int delayLevel,
-            final long timeoutMillis,
-            final int maxConsumeRetryTimes
+        final String addr,
+        final MessageExt msg,
+        final String consumerGroup,
+        final int delayLevel,
+        final long timeoutMillis,
+        final int maxConsumeRetryTimes
     ) throws RemotingException, MQBrokerException, InterruptedException {
         ConsumerSendMsgBackRequestHeader requestHeader = new 
ConsumerSendMsgBackRequestHeader();
         RemotingCommand request = 
RemotingCommand.createRequestCommand(RequestCode.CONSUMER_SEND_MSG_BACK, 
requestHeader);
@@ -866,7 +929,7 @@ public class MQClientAPIImpl {
         requestHeader.setMaxReconsumeTimes(maxConsumeRetryTimes);
 
         RemotingCommand response = 
this.remotingClient.invokeSync(MixAll.brokerVIPChannel(this.clientConfig.isVipChannelEnabled(),
 addr),
-                request, timeoutMillis);
+            request, timeoutMillis);
         assert response != null;
         switch (response.getCode()) {
             case ResponseCode.SUCCESS: {
@@ -879,16 +942,15 @@ public class MQClientAPIImpl {
         throw new MQBrokerException(response.getCode(), response.getRemark());
     }
 
-
     public Set<MessageQueue> lockBatchMQ(//
-                                         final String addr, //
-                                         final LockBatchRequestBody 
requestBody, //
-                                         final long timeoutMillis) throws 
RemotingException, MQBrokerException, InterruptedException {
+        final String addr, //
+        final LockBatchRequestBody requestBody, //
+        final long timeoutMillis) throws RemotingException, MQBrokerException, 
InterruptedException {
         RemotingCommand request = 
RemotingCommand.createRequestCommand(RequestCode.LOCK_BATCH_MQ, null);
 
         request.setBody(requestBody.encode());
         RemotingCommand response = 
this.remotingClient.invokeSync(MixAll.brokerVIPChannel(this.clientConfig.isVipChannelEnabled(),
 addr),
-                request, timeoutMillis);
+            request, timeoutMillis);
         switch (response.getCode()) {
             case ResponseCode.SUCCESS: {
                 LockBatchResponseBody responseBody = 
LockBatchResponseBody.decode(response.getBody(), LockBatchResponseBody.class);
@@ -902,12 +964,11 @@ public class MQClientAPIImpl {
         throw new MQBrokerException(response.getCode(), response.getRemark());
     }
 
-
     public void unlockBatchMQ(//
-                              final String addr, //
-                              final UnlockBatchRequestBody requestBody, //
-                              final long timeoutMillis, //
-                              final boolean oneway//
+        final String addr, //
+        final UnlockBatchRequestBody requestBody, //
+        final long timeoutMillis, //
+        final boolean oneway//
     ) throws RemotingException, MQBrokerException, InterruptedException {
         RemotingCommand request = 
RemotingCommand.createRequestCommand(RequestCode.UNLOCK_BATCH_MQ, null);
 
@@ -917,7 +978,7 @@ public class MQClientAPIImpl {
             this.remotingClient.invokeOneway(addr, request, timeoutMillis);
         } else {
             RemotingCommand response = this.remotingClient
-                    
.invokeSync(MixAll.brokerVIPChannel(this.clientConfig.isVipChannelEnabled(), 
addr), request, timeoutMillis);
+                
.invokeSync(MixAll.brokerVIPChannel(this.clientConfig.isVipChannelEnabled(), 
addr), request, timeoutMillis);
             switch (response.getCode()) {
                 case ResponseCode.SUCCESS: {
                     return;
@@ -930,16 +991,15 @@ public class MQClientAPIImpl {
         }
     }
 
-
     public TopicStatsTable getTopicStatsInfo(final String addr, final String 
topic, final long timeoutMillis) throws InterruptedException,
-            RemotingTimeoutException, RemotingSendRequestException, 
RemotingConnectException, MQBrokerException {
+        RemotingTimeoutException, RemotingSendRequestException, 
RemotingConnectException, MQBrokerException {
         GetTopicStatsInfoRequestHeader requestHeader = new 
GetTopicStatsInfoRequestHeader();
         requestHeader.setTopic(topic);
 
         RemotingCommand request = 
RemotingCommand.createRequestCommand(RequestCode.GET_TOPIC_STATS_INFO, 
requestHeader);
 
         RemotingCommand response = 
this.remotingClient.invokeSync(MixAll.brokerVIPChannel(this.clientConfig.isVipChannelEnabled(),
 addr),
-                request, timeoutMillis);
+            request, timeoutMillis);
         switch (response.getCode()) {
             case ResponseCode.SUCCESS: {
                 TopicStatsTable topicStatsTable = 
TopicStatsTable.decode(response.getBody(), TopicStatsTable.class);
@@ -952,17 +1012,15 @@ public class MQClientAPIImpl {
         throw new MQBrokerException(response.getCode(), response.getRemark());
     }
 
-
     public ConsumeStats getConsumeStats(final String addr, final String 
consumerGroup, final long timeoutMillis)
-            throws InterruptedException, RemotingTimeoutException, 
RemotingSendRequestException, RemotingConnectException,
-            MQBrokerException {
+        throws InterruptedException, RemotingTimeoutException, 
RemotingSendRequestException, RemotingConnectException,
+        MQBrokerException {
         return getConsumeStats(addr, consumerGroup, null, timeoutMillis);
     }
 
-
     public ConsumeStats getConsumeStats(final String addr, final String 
consumerGroup, final String topic, final long timeoutMillis)
-            throws InterruptedException, RemotingTimeoutException, 
RemotingSendRequestException, RemotingConnectException,
-            MQBrokerException {
+        throws InterruptedException, RemotingTimeoutException, 
RemotingSendRequestException, RemotingConnectException,
+        MQBrokerException {
         GetConsumeStatsRequestHeader requestHeader = new 
GetConsumeStatsRequestHeader();
         requestHeader.setConsumerGroup(consumerGroup);
         requestHeader.setTopic(topic);
@@ -970,7 +1028,7 @@ public class MQClientAPIImpl {
         RemotingCommand request = 
RemotingCommand.createRequestCommand(RequestCode.GET_CONSUME_STATS, 
requestHeader);
 
         RemotingCommand response = 
this.remotingClient.invokeSync(MixAll.brokerVIPChannel(this.clientConfig.isVipChannelEnabled(),
 addr),
-                request, timeoutMillis);
+            request, timeoutMillis);
         switch (response.getCode()) {
             case ResponseCode.SUCCESS: {
                 ConsumeStats consumeStats = 
ConsumeStats.decode(response.getBody(), ConsumeStats.class);
@@ -983,17 +1041,16 @@ public class MQClientAPIImpl {
         throw new MQBrokerException(response.getCode(), response.getRemark());
     }
 
-
     public ProducerConnection getProducerConnectionList(final String addr, 
final String producerGroup, final long timeoutMillis)
-            throws RemotingConnectException, RemotingSendRequestException, 
RemotingTimeoutException, InterruptedException,
-            MQBrokerException {
+        throws RemotingConnectException, RemotingSendRequestException, 
RemotingTimeoutException, InterruptedException,
+        MQBrokerException {
         GetProducerConnectionListRequestHeader requestHeader = new 
GetProducerConnectionListRequestHeader();
         requestHeader.setProducerGroup(producerGroup);
 
         RemotingCommand request = 
RemotingCommand.createRequestCommand(RequestCode.GET_PRODUCER_CONNECTION_LIST, 
requestHeader);
 
         RemotingCommand response = 
this.remotingClient.invokeSync(MixAll.brokerVIPChannel(this.clientConfig.isVipChannelEnabled(),
 addr),
-                request, timeoutMillis);
+            request, timeoutMillis);
         switch (response.getCode()) {
             case ResponseCode.SUCCESS: {
                 return ProducerConnection.decode(response.getBody(), 
ProducerConnection.class);
@@ -1005,17 +1062,16 @@ public class MQClientAPIImpl {
         throw new MQBrokerException(response.getCode(), response.getRemark());
     }
 
-
     public ConsumerConnection getConsumerConnectionList(final String addr, 
final String consumerGroup, final long timeoutMillis)
-            throws RemotingConnectException, RemotingSendRequestException, 
RemotingTimeoutException, InterruptedException,
-            MQBrokerException {
+        throws RemotingConnectException, RemotingSendRequestException, 
RemotingTimeoutException, InterruptedException,
+        MQBrokerException {
         GetConsumerConnectionListRequestHeader requestHeader = new 
GetConsumerConnectionListRequestHeader();
         requestHeader.setConsumerGroup(consumerGroup);
 
         RemotingCommand request = 
RemotingCommand.createRequestCommand(RequestCode.GET_CONSUMER_CONNECTION_LIST, 
requestHeader);
 
         RemotingCommand response = 
this.remotingClient.invokeSync(MixAll.brokerVIPChannel(this.clientConfig.isVipChannelEnabled(),
 addr),
-                request, timeoutMillis);
+            request, timeoutMillis);
         switch (response.getCode()) {
             case ResponseCode.SUCCESS: {
                 ConsumerConnection consumerConnection = 
ConsumerConnection.decode(response.getBody(), ConsumerConnection.class);
@@ -1028,14 +1084,13 @@ public class MQClientAPIImpl {
         throw new MQBrokerException(response.getCode(), response.getRemark());
     }
 
-
     public KVTable getBrokerRuntimeInfo(final String addr, final long 
timeoutMillis) throws RemotingConnectException,
-            RemotingSendRequestException, RemotingTimeoutException, 
InterruptedException, MQBrokerException {
+        RemotingSendRequestException, RemotingTimeoutException, 
InterruptedException, MQBrokerException {
 
         RemotingCommand request = 
RemotingCommand.createRequestCommand(RequestCode.GET_BROKER_RUNTIME_INFO, null);
 
         RemotingCommand response = 
this.remotingClient.invokeSync(MixAll.brokerVIPChannel(this.clientConfig.isVipChannelEnabled(),
 addr),
-                request, timeoutMillis);
+            request, timeoutMillis);
         switch (response.getCode()) {
             case ResponseCode.SUCCESS: {
                 return KVTable.decode(response.getBody(), KVTable.class);
@@ -1047,10 +1102,9 @@ public class MQClientAPIImpl {
         throw new MQBrokerException(response.getCode(), response.getRemark());
     }
 
-
     public void updateBrokerConfig(final String addr, final Properties 
properties, final long timeoutMillis)
-            throws RemotingConnectException, RemotingSendRequestException, 
RemotingTimeoutException, InterruptedException,
-            MQBrokerException, UnsupportedEncodingException {
+        throws RemotingConnectException, RemotingSendRequestException, 
RemotingTimeoutException, InterruptedException,
+        MQBrokerException, UnsupportedEncodingException {
 
         RemotingCommand request = 
RemotingCommand.createRequestCommand(RequestCode.UPDATE_BROKER_CONFIG, null);
 
@@ -1058,7 +1112,7 @@ public class MQClientAPIImpl {
         if (str != null && str.length() > 0) {
             request.setBody(str.getBytes(MixAll.DEFAULT_CHARSET));
             RemotingCommand response = this.remotingClient
-                    
.invokeSync(MixAll.brokerVIPChannel(this.clientConfig.isVipChannelEnabled(), 
addr), request, timeoutMillis);
+                
.invokeSync(MixAll.brokerVIPChannel(this.clientConfig.isVipChannelEnabled(), 
addr), request, timeoutMillis);
             switch (response.getCode()) {
                 case ResponseCode.SUCCESS: {
                     return;
@@ -1071,10 +1125,9 @@ public class MQClientAPIImpl {
         }
     }
 
-
     public Properties getBrokerConfig(final String addr, final long 
timeoutMillis)
-            throws RemotingConnectException, RemotingSendRequestException, 
RemotingTimeoutException, InterruptedException,
-            MQBrokerException, UnsupportedEncodingException {
+        throws RemotingConnectException, RemotingSendRequestException, 
RemotingTimeoutException, InterruptedException,
+        MQBrokerException, UnsupportedEncodingException {
         RemotingCommand request = 
RemotingCommand.createRequestCommand(RequestCode.GET_BROKER_CONFIG, null);
 
         RemotingCommand response = this.remotingClient.invokeSync(addr, 
request, timeoutMillis);
@@ -1091,7 +1144,7 @@ public class MQClientAPIImpl {
     }
 
     public ClusterInfo getBrokerClusterInfo(final long timeoutMillis) throws 
InterruptedException, RemotingTimeoutException,
-            RemotingSendRequestException, RemotingConnectException, 
MQBrokerException {
+        RemotingSendRequestException, RemotingConnectException, 
MQBrokerException {
         RemotingCommand request = 
RemotingCommand.createRequestCommand(RequestCode.GET_BROKER_CLUSTER_INFO, null);
 
         RemotingCommand response = this.remotingClient.invokeSync(null, 
request, timeoutMillis);
@@ -1108,9 +1161,8 @@ public class MQClientAPIImpl {
         throw new MQBrokerException(response.getCode(), response.getRemark());
     }
 
-
     public TopicRouteData getDefaultTopicRouteInfoFromNameServer(final String 
topic, final long timeoutMillis)
-            throws RemotingException, MQClientException, InterruptedException {
+        throws RemotingException, MQClientException, InterruptedException {
         GetRouteInfoRequestHeader requestHeader = new 
GetRouteInfoRequestHeader();
         requestHeader.setTopic(topic);
 
@@ -1136,9 +1188,8 @@ public class MQClientAPIImpl {
         throw new MQClientException(response.getCode(), response.getRemark());
     }
 
-
     public TopicRouteData getTopicRouteInfoFromNameServer(final String topic, 
final long timeoutMillis)
-            throws RemotingException, MQClientException, InterruptedException {
+        throws RemotingException, MQClientException, InterruptedException {
         GetRouteInfoRequestHeader requestHeader = new 
GetRouteInfoRequestHeader();
         requestHeader.setTopic(topic);
 
@@ -1165,9 +1216,8 @@ public class MQClientAPIImpl {
         throw new MQClientException(response.getCode(), response.getRemark());
     }
 
-
     public TopicList getTopicListFromNameServer(final long timeoutMillis)
-            throws RemotingException, MQClientException, InterruptedException {
+        throws RemotingException, MQClientException, InterruptedException {
         RemotingCommand request = 
RemotingCommand.createRequestCommand(RequestCode.GET_ALL_TOPIC_LIST_FROM_NAMESERVER,
 null);
 
         RemotingCommand response = this.remotingClient.invokeSync(null, 
request, timeoutMillis);
@@ -1187,9 +1237,8 @@ public class MQClientAPIImpl {
         throw new MQClientException(response.getCode(), response.getRemark());
     }
 
-
     public int wipeWritePermOfBroker(final String namesrvAddr, String 
brokerName, final long timeoutMillis) throws RemotingCommandException,
-            RemotingConnectException, RemotingSendRequestException, 
RemotingTimeoutException, InterruptedException, MQClientException {
+        RemotingConnectException, RemotingSendRequestException, 
RemotingTimeoutException, InterruptedException, MQClientException {
         WipeWritePermOfBrokerRequestHeader requestHeader = new 
WipeWritePermOfBrokerRequestHeader();
         requestHeader.setBrokerName(brokerName);
 
@@ -1200,7 +1249,7 @@ public class MQClientAPIImpl {
         switch (response.getCode()) {
             case ResponseCode.SUCCESS: {
                 WipeWritePermOfBrokerResponseHeader responseHeader =
-                        (WipeWritePermOfBrokerResponseHeader) 
response.decodeCommandCustomHeader(WipeWritePermOfBrokerResponseHeader.class);
+                    
(WipeWritePermOfBrokerResponseHeader)response.decodeCommandCustomHeader(WipeWritePermOfBrokerResponseHeader.class);
                 return responseHeader.getWipeTopicCount();
             }
             default:
@@ -1210,15 +1259,14 @@ public class MQClientAPIImpl {
         throw new MQClientException(response.getCode(), response.getRemark());
     }
 
-
     public void deleteTopicInBroker(final String addr, final String topic, 
final long timeoutMillis)
-            throws RemotingException, MQBrokerException, InterruptedException, 
MQClientException {
+        throws RemotingException, MQBrokerException, InterruptedException, 
MQClientException {
         DeleteTopicRequestHeader requestHeader = new 
DeleteTopicRequestHeader();
         requestHeader.setTopic(topic);
         RemotingCommand request = 
RemotingCommand.createRequestCommand(RequestCode.DELETE_TOPIC_IN_BROKER, 
requestHeader);
 
         RemotingCommand response = 
this.remotingClient.invokeSync(MixAll.brokerVIPChannel(this.clientConfig.isVipChannelEnabled(),
 addr),
-                request, timeoutMillis);
+            request, timeoutMillis);
         assert response != null;
         switch (response.getCode()) {
             case ResponseCode.SUCCESS: {
@@ -1231,9 +1279,8 @@ public class MQClientAPIImpl {
         throw new MQClientException(response.getCode(), response.getRemark());
     }
 
-
     public void deleteTopicInNameServer(final String addr, final String topic, 
final long timeoutMillis)
-            throws RemotingException, MQBrokerException, InterruptedException, 
MQClientException {
+        throws RemotingException, MQBrokerException, InterruptedException, 
MQClientException {
         DeleteTopicRequestHeader requestHeader = new 
DeleteTopicRequestHeader();
         requestHeader.setTopic(topic);
         RemotingCommand request = 
RemotingCommand.createRequestCommand(RequestCode.DELETE_TOPIC_IN_NAMESRV, 
requestHeader);
@@ -1251,15 +1298,14 @@ public class MQClientAPIImpl {
         throw new MQClientException(response.getCode(), response.getRemark());
     }
 
-
     public void deleteSubscriptionGroup(final String addr, final String 
groupName, final long timeoutMillis)
-            throws RemotingException, MQBrokerException, InterruptedException, 
MQClientException {
+        throws RemotingException, MQBrokerException, InterruptedException, 
MQClientException {
         DeleteSubscriptionGroupRequestHeader requestHeader = new 
DeleteSubscriptionGroupRequestHeader();
         requestHeader.setGroupName(groupName);
         RemotingCommand request = 
RemotingCommand.createRequestCommand(RequestCode.DELETE_SUBSCRIPTIONGROUP, 
requestHeader);
 
         RemotingCommand response = 
this.remotingClient.invokeSync(MixAll.brokerVIPChannel(this.clientConfig.isVipChannelEnabled(),
 addr),
-                request, timeoutMillis);
+            request, timeoutMillis);
         assert response != null;
         switch (response.getCode()) {
             case ResponseCode.SUCCESS: {
@@ -1272,9 +1318,8 @@ public class MQClientAPIImpl {
         throw new MQClientException(response.getCode(), response.getRemark());
     }
 
-
     public String getKVConfigValue(final String namespace, final String key, 
final long timeoutMillis)
-            throws RemotingException, MQClientException, InterruptedException {
+        throws RemotingException, MQClientException, InterruptedException {
         GetKVConfigRequestHeader requestHeader = new 
GetKVConfigRequestHeader();
         requestHeader.setNamespace(namespace);
         requestHeader.setKey(key);
@@ -1286,7 +1331,7 @@ public class MQClientAPIImpl {
         switch (response.getCode()) {
             case ResponseCode.SUCCESS: {
                 GetKVConfigResponseHeader responseHeader =
-                        (GetKVConfigResponseHeader) 
response.decodeCommandCustomHeader(GetKVConfigResponseHeader.class);
+                    
(GetKVConfigResponseHeader)response.decodeCommandCustomHeader(GetKVConfigResponseHeader.class);
                 return responseHeader.getValue();
             }
             default:
@@ -1296,9 +1341,8 @@ public class MQClientAPIImpl {
         throw new MQClientException(response.getCode(), response.getRemark());
     }
 
-
     public void putKVConfigValue(final String namespace, final String key, 
final String value, final long timeoutMillis)
-            throws RemotingException, MQClientException, InterruptedException {
+        throws RemotingException, MQClientException, InterruptedException {
         PutKVConfigRequestHeader requestHeader = new 
PutKVConfigRequestHeader();
         requestHeader.setNamespace(namespace);
         requestHeader.setKey(key);
@@ -1327,9 +1371,8 @@ public class MQClientAPIImpl {
         }
     }
 
-
     public void deleteKVConfigValue(final String namespace, final String key, 
final long timeoutMillis)
-            throws RemotingException, MQClientException, InterruptedException {
+        throws RemotingException, MQClientException, InterruptedException {
         DeleteKVConfigRequestHeader requestHeader = new 
DeleteKVConfigRequestHeader();
         requestHeader.setNamespace(namespace);
         requestHeader.setKey(key);
@@ -1356,9 +1399,8 @@ public class MQClientAPIImpl {
         }
     }
 
-
     public KVTable getKVListByNamespace(final String namespace, final long 
timeoutMillis)
-            throws RemotingException, MQClientException, InterruptedException {
+        throws RemotingException, MQClientException, InterruptedException {
         GetKVListByNamespaceRequestHeader requestHeader = new 
GetKVListByNamespaceRequestHeader();
         requestHeader.setNamespace(namespace);
 
@@ -1377,17 +1419,15 @@ public class MQClientAPIImpl {
         throw new MQClientException(response.getCode(), response.getRemark());
     }
 
-
     public Map<MessageQueue, Long> invokeBrokerToResetOffset(final String 
addr, final String topic, final String group,
-                                                             final long 
timestamp, final boolean isForce, final long timeoutMillis)
-            throws RemotingException, MQClientException, InterruptedException {
+        final long timestamp, final boolean isForce, final long timeoutMillis)
+        throws RemotingException, MQClientException, InterruptedException {
         return invokeBrokerToResetOffset(addr, topic, group, timestamp, 
isForce, timeoutMillis, false);
     }
 
-
     public Map<MessageQueue, Long> invokeBrokerToResetOffset(final String 
addr, final String topic, final String group,
-                                                             final long 
timestamp, final boolean isForce, final long timeoutMillis, boolean isC)
-            throws RemotingException, MQClientException, InterruptedException {
+        final long timestamp, final boolean isForce, final long timeoutMillis, 
boolean isC)
+        throws RemotingException, MQClientException, InterruptedException {
         ResetOffsetRequestHeader requestHeader = new 
ResetOffsetRequestHeader();
         requestHeader.setTopic(topic);
         requestHeader.setGroup(group);
@@ -1400,7 +1440,7 @@ public class MQClientAPIImpl {
         }
 
         RemotingCommand response = 
this.remotingClient.invokeSync(MixAll.brokerVIPChannel(this.clientConfig.isVipChannelEnabled(),
 addr),
-                request, timeoutMillis);
+            request, timeoutMillis);
         assert response != null;
         switch (response.getCode()) {
             case ResponseCode.SUCCESS: {
@@ -1416,9 +1456,8 @@ public class MQClientAPIImpl {
         throw new MQClientException(response.getCode(), response.getRemark());
     }
 
-
     public Map<String, Map<MessageQueue, Long>> 
invokeBrokerToGetConsumerStatus(final String addr, final String topic, final 
String group,
-                                                                               
 final String clientAddr, final long timeoutMillis) throws RemotingException, 
MQClientException, InterruptedException {
+        final String clientAddr, final long timeoutMillis) throws 
RemotingException, MQClientException, InterruptedException {
         GetConsumerStatusRequestHeader requestHeader = new 
GetConsumerStatusRequestHeader();
         requestHeader.setTopic(topic);
         requestHeader.setGroup(group);
@@ -1427,7 +1466,7 @@ public class MQClientAPIImpl {
         RemotingCommand request = 
RemotingCommand.createRequestCommand(RequestCode.INVOKE_BROKER_TO_GET_CONSUMER_STATUS,
 requestHeader);
 
         RemotingCommand response = 
this.remotingClient.invokeSync(MixAll.brokerVIPChannel(this.clientConfig.isVipChannelEnabled(),
 addr),
-                request, timeoutMillis);
+            request, timeoutMillis);
         assert response != null;
         switch (response.getCode()) {
             case ResponseCode.SUCCESS: {
@@ -1443,17 +1482,16 @@ public class MQClientAPIImpl {
         throw new MQClientException(response.getCode(), response.getRemark());
     }
 
-
     public GroupList queryTopicConsumeByWho(final String addr, final String 
topic, final long timeoutMillis)
-            throws RemotingConnectException, RemotingSendRequestException, 
RemotingTimeoutException, InterruptedException,
-            MQBrokerException {
+        throws RemotingConnectException, RemotingSendRequestException, 
RemotingTimeoutException, InterruptedException,
+        MQBrokerException {
         QueryTopicConsumeByWhoRequestHeader requestHeader = new 
QueryTopicConsumeByWhoRequestHeader();
         requestHeader.setTopic(topic);
 
         RemotingCommand request = 
RemotingCommand.createRequestCommand(RequestCode.QUERY_TOPIC_CONSUME_BY_WHO, 
requestHeader);
 
         RemotingCommand response = 
this.remotingClient.invokeSync(MixAll.brokerVIPChannel(this.clientConfig.isVipChannelEnabled(),
 addr),
-                request, timeoutMillis);
+            request, timeoutMillis);
         switch (response.getCode()) {
             case ResponseCode.SUCCESS: {
                 GroupList groupList = GroupList.decode(response.getBody(), 
GroupList.class);
@@ -1466,10 +1504,9 @@ public class MQClientAPIImpl {
         throw new MQBrokerException(response.getCode(), response.getRemark());
     }
 
-
     public List<QueueTimeSpan> queryConsumeTimeSpan(final String addr, final 
String topic, final String group, final long timeoutMillis)
-            throws RemotingConnectException, RemotingSendRequestException, 
RemotingTimeoutException, InterruptedException,
-            MQBrokerException {
+        throws RemotingConnectException, RemotingSendRequestException, 
RemotingTimeoutException, InterruptedException,
+        MQBrokerException {
         QueryConsumeTimeSpanRequestHeader requestHeader = new 
QueryConsumeTimeSpanRequestHeader();
         requestHeader.setTopic(topic);
         requestHeader.setGroup(group);
@@ -1477,7 +1514,7 @@ public class MQClientAPIImpl {
         RemotingCommand request = 
RemotingCommand.createRequestCommand(RequestCode.QUERY_CONSUME_TIME_SPAN, 
requestHeader);
 
         RemotingCommand response = 
this.remotingClient.invokeSync(MixAll.brokerVIPChannel(this.clientConfig.isVipChannelEnabled(),
 addr),
-                request, timeoutMillis);
+            request, timeoutMillis);
         switch (response.getCode()) {
             case ResponseCode.SUCCESS: {
                 QueryConsumeTimeSpanBody consumeTimeSpanBody = 
GroupList.decode(response.getBody(), QueryConsumeTimeSpanBody.class);
@@ -1490,9 +1527,8 @@ public class MQClientAPIImpl {
         throw new MQBrokerException(response.getCode(), response.getRemark());
     }
 
-
     public TopicList getTopicsByCluster(final String cluster, final long 
timeoutMillis)
-            throws RemotingException, MQClientException, InterruptedException {
+        throws RemotingException, MQClientException, InterruptedException {
         GetTopicsByClusterRequestHeader requestHeader = new 
GetTopicsByClusterRequestHeader();
         requestHeader.setCluster(cluster);
         RemotingCommand request = 
RemotingCommand.createRequestCommand(RequestCode.GET_TOPICS_BY_CLUSTER, 
requestHeader);
@@ -1514,15 +1550,14 @@ public class MQClientAPIImpl {
         throw new MQClientException(response.getCode(), response.getRemark());
     }
 
-
     public void registerMessageFilterClass(final String addr, //
-                                           final String consumerGroup, //
-                                           final String topic, //
-                                           final String className, //
-                                           final int classCRC, //
-                                           final byte[] classBody, //
-                                           final long timeoutMillis) throws 
RemotingConnectException, RemotingSendRequestException, 
RemotingTimeoutException,
-            InterruptedException, MQBrokerException {
+        final String consumerGroup, //
+        final String topic, //
+        final String className, //
+        final int classCRC, //
+        final byte[] classBody, //
+        final long timeoutMillis) throws RemotingConnectException, 
RemotingSendRequestException, RemotingTimeoutException,
+        InterruptedException, MQBrokerException {
         RegisterMessageFilterClassRequestHeader requestHeader = new 
RegisterMessageFilterClassRequestHeader();
         requestHeader.setConsumerGroup(consumerGroup);
         requestHeader.setClassName(className);
@@ -1543,7 +1578,6 @@ public class MQClientAPIImpl {
         throw new MQBrokerException(response.getCode(), response.getRemark());
     }
 
-
     public TopicList getSystemTopicList(final long timeoutMillis) throws 
RemotingException, MQClientException, InterruptedException {
         RemotingCommand request = 
RemotingCommand.createRequestCommand(RequestCode.GET_SYSTEM_TOPIC_LIST_FROM_NS, 
null);
 
@@ -1555,7 +1589,7 @@ public class MQClientAPIImpl {
                 if (body != null) {
                     TopicList topicList = TopicList.decode(response.getBody(), 
TopicList.class);
                     if (topicList.getTopicList() != null && 
!topicList.getTopicList().isEmpty()
-                            && !UtilAll.isBlank(topicList.getBrokerAddr())) {
+                        && !UtilAll.isBlank(topicList.getBrokerAddr())) {
                         TopicList tmp = 
getSystemTopicListFromBroker(topicList.getBrokerAddr(), timeoutMillis);
                         if (tmp.getTopicList() != null && 
!tmp.getTopicList().isEmpty()) {
                             
topicList.getTopicList().addAll(tmp.getTopicList());
@@ -1571,13 +1605,12 @@ public class MQClientAPIImpl {
         throw new MQClientException(response.getCode(), response.getRemark());
     }
 
-
     public TopicList getSystemTopicListFromBroker(final String addr, final 
long timeoutMillis)
-            throws RemotingException, MQClientException, InterruptedException {
+        throws RemotingException, MQClientException, InterruptedException {
         RemotingCommand request = 
RemotingCommand.createRequestCommand(RequestCode.GET_SYSTEM_TOPIC_LIST_FROM_BROKER,
 null);
 
         RemotingCommand response = 
this.remotingClient.invokeSync(MixAll.brokerVIPChannel(this.clientConfig.isVipChannelEnabled(),
 addr),
-                request, timeoutMillis);
+            request, timeoutMillis);
         assert response != null;
         switch (response.getCode()) {
             case ResponseCode.SUCCESS: {
@@ -1594,12 +1627,11 @@ public class MQClientAPIImpl {
         throw new MQClientException(response.getCode(), response.getRemark());
     }
 
-
     public boolean cleanExpiredConsumeQueue(final String addr, long 
timeoutMillis) throws MQClientException, RemotingConnectException,
-            RemotingSendRequestException, RemotingTimeoutException, 
InterruptedException {
+        RemotingSendRequestException, RemotingTimeoutException, 
InterruptedException {
         RemotingCommand request = 
RemotingCommand.createRequestCommand(RequestCode.CLEAN_EXPIRED_CONSUMEQUEUE, 
null);
         RemotingCommand response = 
this.remotingClient.invokeSync(MixAll.brokerVIPChannel(this.clientConfig.isVipChannelEnabled(),
 addr),
-                request, timeoutMillis);
+            request, timeoutMillis);
         switch (response.getCode()) {
             case ResponseCode.SUCCESS: {
                 return true;
@@ -1611,12 +1643,11 @@ public class MQClientAPIImpl {
         throw new MQClientException(response.getCode(), response.getRemark());
     }
 
-
     public boolean cleanUnusedTopicByAddr(final String addr, long 
timeoutMillis) throws MQClientException, RemotingConnectException,
-            RemotingSendRequestException, RemotingTimeoutException, 
InterruptedException {
+        RemotingSendRequestException, RemotingTimeoutException, 
InterruptedException {
         RemotingCommand request = 
RemotingCommand.createRequestCommand(RequestCode.CLEAN_UNUSED_TOPIC, null);
         RemotingCommand response = 
this.remotingClient.invokeSync(MixAll.brokerVIPChannel(this.clientConfig.isVipChannelEnabled(),
 addr),
-                request, timeoutMillis);
+            request, timeoutMillis);
         switch (response.getCode()) {
             case ResponseCode.SUCCESS: {
                 return true;
@@ -1629,7 +1660,7 @@ public class MQClientAPIImpl {
     }
 
     public ConsumerRunningInfo getConsumerRunningInfo(final String addr, 
String consumerGroup, String clientId, boolean jstack,
-                                                      final long 
timeoutMillis) throws RemotingException, MQClientException, 
InterruptedException {
+        final long timeoutMillis) throws RemotingException, MQClientException, 
InterruptedException {
         GetConsumerRunningInfoRequestHeader requestHeader = new 
GetConsumerRunningInfoRequestHeader();
         requestHeader.setConsumerGroup(consumerGroup);
         requestHeader.setClientId(clientId);
@@ -1638,7 +1669,7 @@ public class MQClientAPIImpl {
         RemotingCommand request = 
RemotingCommand.createRequestCommand(RequestCode.GET_CONSUMER_RUNNING_INFO, 
requestHeader);
 
         RemotingCommand response = 
this.remotingClient.invokeSync(MixAll.brokerVIPChannel(this.clientConfig.isVipChannelEnabled(),
 addr),
-                request, timeoutMillis);
+            request, timeoutMillis);
         assert response != null;
         switch (response.getCode()) {
             case ResponseCode.SUCCESS: {
@@ -1656,10 +1687,10 @@ public class MQClientAPIImpl {
     }
 
     public ConsumeMessageDirectlyResult consumeMessageDirectly(final String 
addr, //
-                                                               String 
consumerGroup, //
-                                                               String 
clientId, //
-                                                               String msgId, //
-                                                               final long 
timeoutMillis) throws RemotingException, MQClientException, 
InterruptedException {
+        String consumerGroup, //
+        String clientId, //
+        String msgId, //
+        final long timeoutMillis) throws RemotingException, MQClientException, 
InterruptedException {
         ConsumeMessageDirectlyResultRequestHeader requestHeader = new 
ConsumeMessageDirectlyResultRequestHeader();
         requestHeader.setConsumerGroup(consumerGroup);
         requestHeader.setClientId(clientId);
@@ -1668,7 +1699,7 @@ public class MQClientAPIImpl {
         RemotingCommand request = 
RemotingCommand.createRequestCommand(RequestCode.CONSUME_MESSAGE_DIRECTLY, 
requestHeader);
 
         RemotingCommand response = 
this.remotingClient.invokeSync(MixAll.brokerVIPChannel(this.clientConfig.isVipChannelEnabled(),
 addr),
-                request, timeoutMillis);
+            request, timeoutMillis);
         assert response != null;
         switch (response.getCode()) {
             case ResponseCode.SUCCESS: {
@@ -1686,8 +1717,8 @@ public class MQClientAPIImpl {
     }
 
     public Map<Integer, Long> queryCorrectionOffset(final String addr, final 
String topic, final String group, Set<String> filterGroup,
-                                                    long timeoutMillis) throws 
MQClientException, RemotingConnectException, RemotingSendRequestException, 
RemotingTimeoutException,
-            InterruptedException {
+        long timeoutMillis) throws MQClientException, 
RemotingConnectException, RemotingSendRequestException, 
RemotingTimeoutException,
+        InterruptedException {
         QueryCorrectionOffsetHeader requestHeader = new 
QueryCorrectionOffsetHeader();
         requestHeader.setCompareGroup(group);
         requestHeader.setTopic(topic);
@@ -1703,7 +1734,7 @@ public class MQClientAPIImpl {
         RemotingCommand request = 
RemotingCommand.createRequestCommand(RequestCode.QUERY_CORRECTION_OFFSET, 
requestHeader);
 
         RemotingCommand response = 
this.remotingClient.invokeSync(MixAll.brokerVIPChannel(this.clientConfig.isVipChannelEnabled(),
 addr),
-                request, timeoutMillis);
+            request, timeoutMillis);
         assert response != null;
         switch (response.getCode()) {
             case ResponseCode.SUCCESS: {
@@ -1720,7 +1751,7 @@ public class MQClientAPIImpl {
     }
 
     public TopicList getUnitTopicList(final boolean containRetry, final long 
timeoutMillis)
-            throws RemotingException, MQClientException, InterruptedException {
+        throws RemotingException, MQClientException, InterruptedException {
         RemotingCommand request = 
RemotingCommand.createRequestCommand(RequestCode.GET_UNIT_TOPIC_LIST, null);
 
         RemotingCommand response = this.remotingClient.invokeSync(null, 
request, timeoutMillis);
@@ -1749,9 +1780,8 @@ public class MQClientAPIImpl {
         throw new MQClientException(response.getCode(), response.getRemark());
     }
 
-
     public TopicList getHasUnitSubTopicList(final boolean containRetry, final 
long timeoutMillis)
-            throws RemotingException, MQClientException, InterruptedException {
+        throws RemotingException, MQClientException, InterruptedException {
         RemotingCommand request = 
RemotingCommand.createRequestCommand(RequestCode.GET_HAS_UNIT_SUB_TOPIC_LIST, 
null);
 
         RemotingCommand response = this.remotingClient.invokeSync(null, 
request, timeoutMillis);
@@ -1779,9 +1809,8 @@ public class MQClientAPIImpl {
         throw new MQClientException(response.getCode(), response.getRemark());
     }
 
-
     public TopicList getHasUnitSubUnUnitTopicList(final boolean containRetry, 
final long timeoutMillis)
-            throws RemotingException, MQClientException, InterruptedException {
+        throws RemotingException, MQClientException, InterruptedException {
         RemotingCommand request = 
RemotingCommand.createRequestCommand(RequestCode.GET_HAS_UNIT_SUB_UNUNIT_TOPIC_LIST,
 null);
 
         RemotingCommand response = this.remotingClient.invokeSync(null, 
request, timeoutMillis);
@@ -1809,9 +1838,8 @@ public class MQClientAPIImpl {
         throw new MQClientException(response.getCode(), response.getRemark());
     }
 
-
     public void cloneGroupOffset(final String addr, final String srcGroup, 
final String destGroup, final String topic,
-                                 final boolean isOffline, final long 
timeoutMillis) throws RemotingException, MQClientException, 
InterruptedException {
+        final boolean isOffline, final long timeoutMillis) throws 
RemotingException, MQClientException, InterruptedException {
         CloneGroupOffsetRequestHeader requestHeader = new 
CloneGroupOffsetRequestHeader();
         requestHeader.setSrcGroup(srcGroup);
         requestHeader.setDestGroup(destGroup);
@@ -1820,7 +1848,7 @@ public class MQClientAPIImpl {
         RemotingCommand request = 
RemotingCommand.createRequestCommand(RequestCode.CLONE_GROUP_OFFSET, 
requestHeader);
 
         RemotingCommand response = 
this.remotingClient.invokeSync(MixAll.brokerVIPChannel(this.clientConfig.isVipChannelEnabled(),
 addr),
-                request, timeoutMillis);
+            request, timeoutMillis);
         assert response != null;
         switch (response.getCode()) {
             case ResponseCode.SUCCESS: {
@@ -1833,10 +1861,9 @@ public class MQClientAPIImpl {
         throw new MQClientException(response.getCode(), response.getRemark());
     }
 
-
     public BrokerStatsData viewBrokerStatsData(String brokerAddr, String 
statsName, String statsKey, long timeoutMillis)
-            throws MQClientException, RemotingConnectException, 
RemotingSendRequestException, RemotingTimeoutException,
-            InterruptedException {
+        throws MQClientException, RemotingConnectException, 
RemotingSendRequestException, RemotingTimeoutException,
+        InterruptedException {
         ViewBrokerStatsDataRequestHeader requestHeader = new 
ViewBrokerStatsDataRequestHeader();
         requestHeader.setStatsName(statsName);
         requestHeader.setStatsKey(statsKey);
@@ -1844,7 +1871,7 @@ public class MQClientAPIImpl {
         RemotingCommand request = 
RemotingCommand.createRequestCommand(RequestCode.VIEW_BROKER_STATS_DATA, 
requestHeader);
 
         RemotingCommand response = this.remotingClient
-                
.invokeSync(MixAll.brokerVIPChannel(this.clientConfig.isVipChannelEnabled(), 
brokerAddr), request, timeoutMillis);
+            
.invokeSync(MixAll.brokerVIPChannel(this.clientConfig.isVipChannelEnabled(), 
brokerAddr), request, timeoutMillis);
         assert response != null;
         switch (response.getCode()) {
             case ResponseCode.SUCCESS: {
@@ -1860,23 +1887,21 @@ public class MQClientAPIImpl {
         throw new MQClientException(response.getCode(), response.getRemark());
     }
 
-
     public Set<String> getClusterList(String topic, long timeoutMillis) throws 
MQClientException, RemotingConnectException,
-            RemotingSendRequestException, RemotingTimeoutException, 
InterruptedException {
+        RemotingSendRequestException, RemotingTimeoutException, 
InterruptedException {
         // todo:jodie
         return Collections.EMPTY_SET;
     }
 
-
     public ConsumeStatsList fetchConsumeStatsInBroker(String brokerAddr, 
boolean isOrder, long timeoutMillis) throws MQClientException,
-            RemotingConnectException, RemotingSendRequestException, 
RemotingTimeoutException, InterruptedException {
+        RemotingConnectException, RemotingSendRequestException, 
RemotingTimeoutException, InterruptedException {
         GetConsumeStatsInBrokerHeader requestHeader = new 
GetConsumeStatsInBrokerHeader();
         requestHeader.setIsOrder(isOrder);
 
         RemotingCommand request = 
RemotingCommand.createRequestCommand(RequestCode.GET_BROKER_CONSUME_STATS, 
requestHeader);
 
         RemotingCommand response = this.remotingClient
-                
.invokeSync(MixAll.brokerVIPChannel(this.clientConfig.isVipChannelEnabled(), 
brokerAddr), request, timeoutMillis);
+            
.invokeSync(MixAll.brokerVIPChannel(this.clientConfig.isVipChannelEnabled(), 
brokerAddr), request, timeoutMillis);
         assert response != null;
         switch (response.getCode()) {
             case ResponseCode.SUCCESS: {
@@ -1892,12 +1917,11 @@ public class MQClientAPIImpl {
         throw new MQClientException(response.getCode(), response.getRemark());
     }
 
-
     public SubscriptionGroupWrapper getAllSubscriptionGroup(final String 
brokerAddr, long timeoutMillis) throws InterruptedException,
-            RemotingTimeoutException, RemotingSendRequestException, 
RemotingConnectException, MQBrokerException {
+        RemotingTimeoutException, RemotingSendRequestException, 
RemotingConnectException, MQBrokerException {
         RemotingCommand request = 
RemotingCommand.createRequestCommand(RequestCode.GET_ALL_SUBSCRIPTIONGROUP_CONFIG,
 null);
         RemotingCommand response = this.remotingClient
-                
.invokeSync(MixAll.brokerVIPChannel(this.clientConfig.isVipChannelEnabled(), 
brokerAddr), request, timeoutMillis);
+            
.invokeSync(MixAll.brokerVIPChannel(this.clientConfig.isVipChannelEnabled(), 
brokerAddr), request, timeoutMillis);
         assert response != null;
         switch (response.getCode()) {
             case ResponseCode.SUCCESS: {
@@ -1909,13 +1933,12 @@ public class MQClientAPIImpl {
         throw new MQBrokerException(response.getCode(), response.getRemark());
     }
 
-
   

<TRUNCATED>

Reply via email to