http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/client/src/main/java/com/alibaba/rocketmq/client/impl/MQClientAPIImpl.java
----------------------------------------------------------------------
diff --git 
a/client/src/main/java/com/alibaba/rocketmq/client/impl/MQClientAPIImpl.java 
b/client/src/main/java/com/alibaba/rocketmq/client/impl/MQClientAPIImpl.java
deleted file mode 100644
index 3d5ba28..0000000
--- a/client/src/main/java/com/alibaba/rocketmq/client/impl/MQClientAPIImpl.java
+++ /dev/null
@@ -1,1996 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.alibaba.rocketmq.client.impl;
-
-import com.alibaba.rocketmq.client.ClientConfig;
-import com.alibaba.rocketmq.client.consumer.PullCallback;
-import com.alibaba.rocketmq.client.consumer.PullResult;
-import com.alibaba.rocketmq.client.consumer.PullStatus;
-import com.alibaba.rocketmq.client.exception.MQBrokerException;
-import com.alibaba.rocketmq.client.exception.MQClientException;
-import com.alibaba.rocketmq.client.hook.SendMessageContext;
-import com.alibaba.rocketmq.client.impl.consumer.PullResultExt;
-import com.alibaba.rocketmq.client.impl.factory.MQClientInstance;
-import com.alibaba.rocketmq.client.impl.producer.DefaultMQProducerImpl;
-import com.alibaba.rocketmq.client.impl.producer.TopicPublishInfo;
-import com.alibaba.rocketmq.client.log.ClientLogger;
-import com.alibaba.rocketmq.client.producer.SendCallback;
-import com.alibaba.rocketmq.client.producer.SendResult;
-import com.alibaba.rocketmq.client.producer.SendStatus;
-import com.alibaba.rocketmq.common.MQVersion;
-import com.alibaba.rocketmq.common.MixAll;
-import com.alibaba.rocketmq.common.TopicConfig;
-import com.alibaba.rocketmq.common.UtilAll;
-import com.alibaba.rocketmq.common.admin.ConsumeStats;
-import com.alibaba.rocketmq.common.admin.TopicStatsTable;
-import com.alibaba.rocketmq.common.message.*;
-import com.alibaba.rocketmq.common.namesrv.TopAddressing;
-import com.alibaba.rocketmq.common.protocol.RequestCode;
-import com.alibaba.rocketmq.common.protocol.ResponseCode;
-import com.alibaba.rocketmq.common.protocol.body.*;
-import com.alibaba.rocketmq.common.protocol.header.*;
-import 
com.alibaba.rocketmq.common.protocol.header.filtersrv.RegisterMessageFilterClassRequestHeader;
-import com.alibaba.rocketmq.common.protocol.header.namesrv.*;
-import com.alibaba.rocketmq.common.protocol.heartbeat.HeartbeatData;
-import com.alibaba.rocketmq.common.protocol.route.TopicRouteData;
-import com.alibaba.rocketmq.common.subscription.SubscriptionGroupConfig;
-import com.alibaba.rocketmq.remoting.InvokeCallback;
-import com.alibaba.rocketmq.remoting.RPCHook;
-import com.alibaba.rocketmq.remoting.RemotingClient;
-import com.alibaba.rocketmq.remoting.exception.*;
-import com.alibaba.rocketmq.remoting.netty.NettyClientConfig;
-import com.alibaba.rocketmq.remoting.netty.NettyRemotingClient;
-import com.alibaba.rocketmq.remoting.netty.ResponseFuture;
-import com.alibaba.rocketmq.remoting.protocol.LanguageCode;
-import com.alibaba.rocketmq.remoting.protocol.RemotingCommand;
-import com.alibaba.rocketmq.remoting.protocol.RemotingSerializable;
-import org.slf4j.Logger;
-
-import java.io.UnsupportedEncodingException;
-import java.nio.ByteBuffer;
-import java.util.*;
-import java.util.concurrent.atomic.AtomicInteger;
-
-
-/**
- * @author shijia.wxr
- */
-public class MQClientAPIImpl {
-
-    private final static Logger log = ClientLogger.getLog();
-    public static boolean sendSmartMsg =
-            
Boolean.parseBoolean(System.getProperty("com.alibaba.rocketmq.client.sendSmartMsg",
 "true"));
-
-    static {
-        System.setProperty(RemotingCommand.REMOTING_VERSION_KEY, 
Integer.toString(MQVersion.CURRENT_VERSION));
-    }
-
-    private final RemotingClient remotingClient;
-    private final TopAddressing topAddressing;
-    private final ClientRemotingProcessor clientRemotingProcessor;
-    private String nameSrvAddr = null;
-    private ClientConfig clientConfig;
-
-    public MQClientAPIImpl(final NettyClientConfig nettyClientConfig, final 
ClientRemotingProcessor clientRemotingProcessor,
-                           RPCHook rpcHook, final ClientConfig clientConfig) {
-        this.clientConfig = clientConfig;
-        topAddressing = new TopAddressing(MixAll.WS_ADDR, 
clientConfig.getUnitName());
-        this.remotingClient = new NettyRemotingClient(nettyClientConfig, null);
-        this.clientRemotingProcessor = clientRemotingProcessor;
-
-        this.remotingClient.registerRPCHook(rpcHook);
-        
this.remotingClient.registerProcessor(RequestCode.CHECK_TRANSACTION_STATE, 
this.clientRemotingProcessor, null);
-
-        
this.remotingClient.registerProcessor(RequestCode.NOTIFY_CONSUMER_IDS_CHANGED, 
this.clientRemotingProcessor, null);
-
-        
this.remotingClient.registerProcessor(RequestCode.RESET_CONSUMER_CLIENT_OFFSET, 
this.clientRemotingProcessor, null);
-
-        
this.remotingClient.registerProcessor(RequestCode.GET_CONSUMER_STATUS_FROM_CLIENT,
 this.clientRemotingProcessor, null);
-
-        
this.remotingClient.registerProcessor(RequestCode.GET_CONSUMER_RUNNING_INFO, 
this.clientRemotingProcessor, null);
-
-        
this.remotingClient.registerProcessor(RequestCode.CONSUME_MESSAGE_DIRECTLY, 
this.clientRemotingProcessor, null);
-    }
-
-    public List<String> getNameServerAddressList() {
-        return this.remotingClient.getNameServerAddressList();
-    }
-
-    public RemotingClient getRemotingClient() {
-        return remotingClient;
-    }
-
-    public String fetchNameServerAddr() {
-        try {
-            String addrs = this.topAddressing.fetchNSAddr();
-            if (addrs != null) {
-                if (!addrs.equals(this.nameSrvAddr)) {
-                    log.info("name server address changed, old=" + 
this.nameSrvAddr + ", new=" + addrs);
-                    this.updateNameServerAddressList(addrs);
-                    this.nameSrvAddr = addrs;
-                    return nameSrvAddr;
-                }
-            }
-        } catch (Exception e) {
-            log.error("fetchNameServerAddr Exception", e);
-        }
-        return nameSrvAddr;
-    }
-
-    public void updateNameServerAddressList(final String addrs) {
-        List<String> lst = new ArrayList<String>();
-        String[] addrArray = addrs.split(";");
-        if (addrArray != null) {
-            for (String addr : addrArray) {
-                lst.add(addr);
-            }
-
-            this.remotingClient.updateNameServerAddressList(lst);
-        }
-    }
-
-    public void start() {
-        this.remotingClient.start();
-    }
-
-    public void shutdown() {
-        this.remotingClient.shutdown();
-    }
-
-    public void createSubscriptionGroup(final String addr, final 
SubscriptionGroupConfig config, final long timeoutMillis)
-            throws RemotingException, MQBrokerException, InterruptedException, 
MQClientException {
-        RemotingCommand request = 
RemotingCommand.createRequestCommand(RequestCode.UPDATE_AND_CREATE_SUBSCRIPTIONGROUP,
 null);
-
-        byte[] body = RemotingSerializable.encode(config);
-        request.setBody(body);
-
-        RemotingCommand response = 
this.remotingClient.invokeSync(MixAll.brokerVIPChannel(this.clientConfig.isVipChannelEnabled(),
 addr),
-                request, timeoutMillis);
-        assert response != null;
-        switch (response.getCode()) {
-            case ResponseCode.SUCCESS: {
-                return;
-            }
-            default:
-                break;
-        }
-
-        throw new MQClientException(response.getCode(), response.getRemark());
-
-    }
-
-    public void createTopic(final String addr, final String defaultTopic, 
final TopicConfig topicConfig, final long timeoutMillis)
-            throws RemotingException, MQBrokerException, InterruptedException, 
MQClientException {
-        CreateTopicRequestHeader requestHeader = new 
CreateTopicRequestHeader();
-        requestHeader.setTopic(topicConfig.getTopicName());
-        requestHeader.setDefaultTopic(defaultTopic);
-        requestHeader.setReadQueueNums(topicConfig.getReadQueueNums());
-        requestHeader.setWriteQueueNums(topicConfig.getWriteQueueNums());
-        requestHeader.setPerm(topicConfig.getPerm());
-        
requestHeader.setTopicFilterType(topicConfig.getTopicFilterType().name());
-        requestHeader.setTopicSysFlag(topicConfig.getTopicSysFlag());
-        requestHeader.setOrder(topicConfig.isOrder());
-
-        RemotingCommand request = 
RemotingCommand.createRequestCommand(RequestCode.UPDATE_AND_CREATE_TOPIC, 
requestHeader);
-
-        RemotingCommand response = 
this.remotingClient.invokeSync(MixAll.brokerVIPChannel(this.clientConfig.isVipChannelEnabled(),
 addr),
-                request, timeoutMillis);
-        assert response != null;
-        switch (response.getCode()) {
-            case ResponseCode.SUCCESS: {
-                return;
-            }
-            default:
-                break;
-        }
-
-        throw new MQClientException(response.getCode(), response.getRemark());
-    }
-
-    public SendResult sendMessage(//
-                                  final String addr, // 1
-                                  final String brokerName, // 2
-                                  final Message msg, // 3
-                                  final SendMessageRequestHeader 
requestHeader, // 4
-                                  final long timeoutMillis, // 5
-                                  final CommunicationMode communicationMode, 
// 6
-                                  final SendMessageContext context, // 7
-                                  final DefaultMQProducerImpl producer // 8
-    ) throws RemotingException, MQBrokerException, InterruptedException {
-        return sendMessage(addr, brokerName, msg, requestHeader, 
timeoutMillis, communicationMode, null, null, null, 0, context, producer);
-    }
-
-    public SendResult sendMessage(//
-                                  final String addr, // 1
-                                  final String brokerName, // 2
-                                  final Message msg, // 3
-                                  final SendMessageRequestHeader 
requestHeader, // 4
-                                  final long timeoutMillis, // 5
-                                  final CommunicationMode communicationMode, 
// 6
-                                  final SendCallback sendCallback, // 7
-                                  final TopicPublishInfo topicPublishInfo, // 8
-                                  final MQClientInstance instance, // 9
-                                  final int retryTimesWhenSendFailed, // 10
-                                  final SendMessageContext context, // 11
-                                  final DefaultMQProducerImpl producer // 12
-    ) throws RemotingException, MQBrokerException, InterruptedException {
-        RemotingCommand request = null;
-        if (sendSmartMsg) {
-            SendMessageRequestHeaderV2 requestHeaderV2 = 
SendMessageRequestHeaderV2.createSendMessageRequestHeaderV2(requestHeader);
-            request = 
RemotingCommand.createRequestCommand(RequestCode.SEND_MESSAGE_V2, 
requestHeaderV2);
-        } else {
-            request = 
RemotingCommand.createRequestCommand(RequestCode.SEND_MESSAGE, requestHeader);
-        }
-
-        request.setBody(msg.getBody());
-
-        switch (communicationMode) {
-            case ONEWAY:
-                this.remotingClient.invokeOneway(addr, request, timeoutMillis);
-                return null;
-            case ASYNC:
-                final AtomicInteger times = new AtomicInteger();
-                this.sendMessageAsync(addr, brokerName, msg, timeoutMillis, 
request, sendCallback, topicPublishInfo, instance,
-                        retryTimesWhenSendFailed, times, context, producer);
-                return null;
-            case SYNC:
-                return this.sendMessageSync(addr, brokerName, msg, 
timeoutMillis, request);
-            default:
-                assert false;
-                break;
-        }
-
-        return null;
-    }
-
-    private SendResult sendMessageSync(//
-                                       final String addr, //
-                                       final String brokerName, //
-                                       final Message msg, //
-                                       final long timeoutMillis, //
-                                       final RemotingCommand request//
-    ) throws RemotingException, MQBrokerException, InterruptedException {
-        RemotingCommand response = this.remotingClient.invokeSync(addr, 
request, timeoutMillis);
-        assert response != null;
-        return this.processSendResponse(brokerName, msg, response);
-    }
-
-    private void sendMessageAsync(//
-                                  final String addr, //
-                                  final String brokerName, //
-                                  final Message msg, //
-                                  final long timeoutMillis, //
-                                  final RemotingCommand request, //
-                                  final SendCallback sendCallback, //
-                                  final TopicPublishInfo topicPublishInfo, //
-                                  final MQClientInstance instance, //
-                                  final int retryTimesWhenSendFailed, //
-                                  final AtomicInteger times, //
-                                  final SendMessageContext context, //
-                                  final DefaultMQProducerImpl producer //
-    ) throws InterruptedException, RemotingException {
-        this.remotingClient.invokeAsync(addr, request, timeoutMillis, new 
InvokeCallback() {
-            @Override
-            public void operationComplete(ResponseFuture responseFuture) {
-                RemotingCommand response = responseFuture.getResponseCommand();
-                if (null == sendCallback && response != null) {
-
-                    try {
-                        SendResult sendResult = 
MQClientAPIImpl.this.processSendResponse(brokerName, msg, response);
-                        if (context != null && sendResult != null) {
-                            context.setSendResult(sendResult);
-                            
context.getProducer().executeSendMessageHookAfter(context);
-                        }
-                    } catch (Throwable e) {
-                        //
-                    }
-
-                    producer.updateFaultItem(brokerName, 
System.currentTimeMillis() - responseFuture.getBeginTimestamp(), false);
-                    return;
-                }
-
-                if (response != null) {
-                    try {
-                        SendResult sendResult = 
MQClientAPIImpl.this.processSendResponse(brokerName, msg, response);
-                        assert sendResult != null;
-                        if (context != null) {
-                            context.setSendResult(sendResult);
-                            
context.getProducer().executeSendMessageHookAfter(context);
-                        }
-
-                        try {
-                            sendCallback.onSuccess(sendResult);
-                        } catch (Throwable e) {
-                        }
-
-                        producer.updateFaultItem(brokerName, 
System.currentTimeMillis() - responseFuture.getBeginTimestamp(), false);
-                    } catch (Exception e) {
-                        producer.updateFaultItem(brokerName, 
System.currentTimeMillis() - responseFuture.getBeginTimestamp(), true);
-                        onExceptionImpl(brokerName, msg, 0L, request, 
sendCallback, topicPublishInfo, instance,
-                                retryTimesWhenSendFailed, times, e, context, 
false, producer);
-                    }
-                } else {
-                    producer.updateFaultItem(brokerName, 
System.currentTimeMillis() - responseFuture.getBeginTimestamp(), true);
-                    if (!responseFuture.isSendRequestOK()) {
-                        MQClientException ex = new MQClientException("send 
request failed", responseFuture.getCause());
-                        onExceptionImpl(brokerName, msg, 0L, request, 
sendCallback, topicPublishInfo, instance,
-                                retryTimesWhenSendFailed, times, ex, context, 
true, producer);
-                    } else if (responseFuture.isTimeout()) {
-                        MQClientException ex = new MQClientException("wait 
response timeout " + responseFuture.getTimeoutMillis() + "ms",
-                                responseFuture.getCause());
-                        onExceptionImpl(brokerName, msg, 0L, request, 
sendCallback, topicPublishInfo, instance,
-                                retryTimesWhenSendFailed, times, ex, context, 
true, producer);
-                    } else {
-                        MQClientException ex = new MQClientException("unknow 
reseaon", responseFuture.getCause());
-                        onExceptionImpl(brokerName, msg, 0L, request, 
sendCallback, topicPublishInfo, instance,
-                                retryTimesWhenSendFailed, times, ex, context, 
true, producer);
-                    }
-                }
-            }
-        });
-    }
-
-
-    private void onExceptionImpl(final String brokerName, //
-                                 final Message msg, //
-                                 final long timeoutMillis, //
-                                 final RemotingCommand request, //
-                                 final SendCallback sendCallback, //
-                                 final TopicPublishInfo topicPublishInfo, //
-                                 final MQClientInstance instance, //
-                                 final int timesTotal, //
-                                 final AtomicInteger curTimes, //
-                                 final Exception e, //
-                                 final SendMessageContext context, //
-                                 final boolean needRetry, //
-                                 final DefaultMQProducerImpl producer // 12
-    ) {
-        int tmp = curTimes.incrementAndGet();
-        if (needRetry && tmp <= timesTotal) {
-            MessageQueue tmpmq = 
producer.selectOneMessageQueue(topicPublishInfo, brokerName);
-            String addr = 
instance.findBrokerAddressInPublish(tmpmq.getBrokerName());
-            log.info("async send msg by retry {} times. topic={}, 
brokerAddr={}, brokerName={}", tmp, msg.getTopic(), addr,
-                    tmpmq.getBrokerName());
-            try {
-                request.setOpaque(RemotingCommand.createNewRequestId());
-                sendMessageAsync(addr, tmpmq.getBrokerName(), msg, 
timeoutMillis, request, sendCallback, topicPublishInfo, instance,
-                        timesTotal, curTimes, context, producer);
-            } catch (InterruptedException e1) {
-                onExceptionImpl(tmpmq.getBrokerName(), msg, timeoutMillis, 
request, sendCallback, topicPublishInfo, instance, timesTotal, curTimes, e1,
-                        context, false, producer);
-            } catch (RemotingConnectException e1) {
-                producer.updateFaultItem(brokerName, 3000, true);
-                onExceptionImpl(tmpmq.getBrokerName(), msg, timeoutMillis, 
request, sendCallback, topicPublishInfo, instance, timesTotal, curTimes, e1,
-                        context, true, producer);
-            } catch (RemotingTooMuchRequestException e1) {
-                onExceptionImpl(tmpmq.getBrokerName(), msg, timeoutMillis, 
request, sendCallback, topicPublishInfo, instance, timesTotal, curTimes, e1,
-                        context, false, producer);
-            } catch (RemotingException e1) {
-                producer.updateFaultItem(brokerName, 3000, true);
-                onExceptionImpl(tmpmq.getBrokerName(), msg, timeoutMillis, 
request, sendCallback, topicPublishInfo, instance, timesTotal, curTimes, e1,
-                        context, true, producer);
-            }
-        } else {
-            if (context != null) {
-                context.setException(e);
-                context.getProducer().executeSendMessageHookAfter(context);
-            }
-            try {
-                sendCallback.onException(e);
-            } catch (Exception e2) {
-            }
-        }
-    }
-
-
-    private SendResult processSendResponse(//
-                                           final String brokerName, //
-                                           final Message msg, //
-                                           final RemotingCommand response//
-    ) throws MQBrokerException, RemotingCommandException {
-        switch (response.getCode()) {
-            case ResponseCode.FLUSH_DISK_TIMEOUT:
-            case ResponseCode.FLUSH_SLAVE_TIMEOUT:
-            case ResponseCode.SLAVE_NOT_AVAILABLE: {
-                // TODO LOG
-            }
-            case ResponseCode.SUCCESS: {
-                SendStatus sendStatus = SendStatus.SEND_OK;
-                switch (response.getCode()) {
-                    case ResponseCode.FLUSH_DISK_TIMEOUT:
-                        sendStatus = SendStatus.FLUSH_DISK_TIMEOUT;
-                        break;
-                    case ResponseCode.FLUSH_SLAVE_TIMEOUT:
-                        sendStatus = SendStatus.FLUSH_SLAVE_TIMEOUT;
-                        break;
-                    case ResponseCode.SLAVE_NOT_AVAILABLE:
-                        sendStatus = SendStatus.SLAVE_NOT_AVAILABLE;
-                        break;
-                    case ResponseCode.SUCCESS:
-                        sendStatus = SendStatus.SEND_OK;
-                        break;
-                    default:
-                        assert false;
-                        break;
-                }
-
-                SendMessageResponseHeader responseHeader =
-                        (SendMessageResponseHeader) 
response.decodeCommandCustomHeader(SendMessageResponseHeader.class);
-
-                MessageQueue messageQueue = new MessageQueue(msg.getTopic(), 
brokerName, responseHeader.getQueueId());
-
-                SendResult sendResult = new SendResult(sendStatus,
-                        MessageClientIDSetter.getUniqID(msg),
-                        responseHeader.getMsgId(), messageQueue, 
responseHeader.getQueueOffset());
-                sendResult.setTransactionId(responseHeader.getTransactionId());
-                String regionId = 
response.getExtFields().get(MessageConst.PROPERTY_MSG_REGION);
-                String traceOn = 
response.getExtFields().get(MessageConst.PROPERTY_TRACE_SWITCH);
-                if (regionId == null || regionId.isEmpty()) {
-                    regionId = MixAll.DEFAULT_TRACE_REGION_ID;
-                }
-                if (traceOn != null && traceOn.equals("false")) {
-                    sendResult.setTraceOn(false);
-                } else {
-                    sendResult.setTraceOn(true);
-                }
-                sendResult.setRegionId(regionId);
-                return sendResult;
-            }
-            default:
-                break;
-        }
-
-        throw new MQBrokerException(response.getCode(), response.getRemark());
-    }
-
-
-    public PullResult pullMessage(//
-                                  final String addr, //
-                                  final PullMessageRequestHeader 
requestHeader, //
-                                  final long timeoutMillis, //
-                                  final CommunicationMode communicationMode, //
-                                  final PullCallback pullCallback//
-    ) throws RemotingException, MQBrokerException, InterruptedException {
-        RemotingCommand request = 
RemotingCommand.createRequestCommand(RequestCode.PULL_MESSAGE, requestHeader);
-
-        switch (communicationMode) {
-            case ONEWAY:
-                assert false;
-                return null;
-            case ASYNC:
-                this.pullMessageAsync(addr, request, timeoutMillis, 
pullCallback);
-                return null;
-            case SYNC:
-                return this.pullMessageSync(addr, request, timeoutMillis);
-            default:
-                assert false;
-                break;
-        }
-
-        return null;
-    }
-
-
-    private void pullMessageAsync(//
-                                  final String addr, // 1
-                                  final RemotingCommand request, //
-                                  final long timeoutMillis, //
-                                  final PullCallback pullCallback//
-    ) throws RemotingException, InterruptedException {
-        this.remotingClient.invokeAsync(addr, request, timeoutMillis, new 
InvokeCallback() {
-            @Override
-            public void operationComplete(ResponseFuture responseFuture) {
-                RemotingCommand response = responseFuture.getResponseCommand();
-                if (response != null) {
-                    try {
-                        PullResult pullResult = 
MQClientAPIImpl.this.processPullResponse(response);
-                        assert pullResult != null;
-                        pullCallback.onSuccess(pullResult);
-                    } catch (Exception e) {
-                        pullCallback.onException(e);
-                    }
-                } else {
-                    if (!responseFuture.isSendRequestOK()) {
-                        pullCallback.onException(new MQClientException("send 
request failed", responseFuture.getCause()));
-                    } else if (responseFuture.isTimeout()) {
-                        pullCallback.onException(new MQClientException("wait 
response timeout " + responseFuture.getTimeoutMillis() + "ms",
-                                responseFuture.getCause()));
-                    } else {
-                        pullCallback.onException(new MQClientException("unknow 
reseaon", responseFuture.getCause()));
-                    }
-                }
-            }
-        });
-    }
-
-    private PullResult pullMessageSync(//
-                                       final String addr, // 1
-                                       final RemotingCommand request, // 2
-                                       final long timeoutMillis// 3
-    ) throws RemotingException, InterruptedException, MQBrokerException {
-        RemotingCommand response = this.remotingClient.invokeSync(addr, 
request, timeoutMillis);
-        assert response != null;
-        return this.processPullResponse(response);
-    }
-
-    private PullResult processPullResponse(final RemotingCommand response) 
throws MQBrokerException, RemotingCommandException {
-        PullStatus pullStatus = PullStatus.NO_NEW_MSG;
-        switch (response.getCode()) {
-            case ResponseCode.SUCCESS:
-                pullStatus = PullStatus.FOUND;
-                break;
-            case ResponseCode.PULL_NOT_FOUND:
-                pullStatus = PullStatus.NO_NEW_MSG;
-                break;
-            case ResponseCode.PULL_RETRY_IMMEDIATELY:
-                pullStatus = PullStatus.NO_MATCHED_MSG;
-                break;
-            case ResponseCode.PULL_OFFSET_MOVED:
-                pullStatus = PullStatus.OFFSET_ILLEGAL;
-                break;
-
-            default:
-                throw new MQBrokerException(response.getCode(), 
response.getRemark());
-        }
-
-        PullMessageResponseHeader responseHeader =
-                (PullMessageResponseHeader) 
response.decodeCommandCustomHeader(PullMessageResponseHeader.class);
-
-        return new PullResultExt(pullStatus, 
responseHeader.getNextBeginOffset(), responseHeader.getMinOffset(),
-                responseHeader.getMaxOffset(), null, 
responseHeader.getSuggestWhichBrokerId(), response.getBody());
-    }
-
-    public MessageExt viewMessage(final String addr, final long phyoffset, 
final long timeoutMillis)
-            throws RemotingException, MQBrokerException, InterruptedException {
-        ViewMessageRequestHeader requestHeader = new 
ViewMessageRequestHeader();
-        requestHeader.setOffset(phyoffset);
-        RemotingCommand request = 
RemotingCommand.createRequestCommand(RequestCode.VIEW_MESSAGE_BY_ID, 
requestHeader);
-
-        RemotingCommand response = 
this.remotingClient.invokeSync(MixAll.brokerVIPChannel(this.clientConfig.isVipChannelEnabled(),
 addr),
-                request, timeoutMillis);
-        assert response != null;
-        switch (response.getCode()) {
-            case ResponseCode.SUCCESS: {
-                ByteBuffer byteBuffer = ByteBuffer.wrap(response.getBody());
-                MessageExt messageExt = 
MessageDecoder.clientDecode(byteBuffer, true);
-                return messageExt;
-            }
-            default:
-                break;
-        }
-
-        throw new MQBrokerException(response.getCode(), response.getRemark());
-    }
-
-
-    public long searchOffset(final String addr, final String topic, final int 
queueId, final long timestamp, final long timeoutMillis)
-            throws RemotingException, MQBrokerException, InterruptedException {
-        SearchOffsetRequestHeader requestHeader = new 
SearchOffsetRequestHeader();
-        requestHeader.setTopic(topic);
-        requestHeader.setQueueId(queueId);
-        requestHeader.setTimestamp(timestamp);
-        RemotingCommand request = 
RemotingCommand.createRequestCommand(RequestCode.SEARCH_OFFSET_BY_TIMESTAMP, 
requestHeader);
-
-        RemotingCommand response = 
this.remotingClient.invokeSync(MixAll.brokerVIPChannel(this.clientConfig.isVipChannelEnabled(),
 addr),
-                request, timeoutMillis);
-        assert response != null;
-        switch (response.getCode()) {
-            case ResponseCode.SUCCESS: {
-                SearchOffsetResponseHeader responseHeader =
-                        (SearchOffsetResponseHeader) 
response.decodeCommandCustomHeader(SearchOffsetResponseHeader.class);
-                return responseHeader.getOffset();
-            }
-            default:
-                break;
-        }
-
-        throw new MQBrokerException(response.getCode(), response.getRemark());
-    }
-
-
-    public long getMaxOffset(final String addr, final String topic, final int 
queueId, final long timeoutMillis)
-            throws RemotingException, MQBrokerException, InterruptedException {
-        GetMaxOffsetRequestHeader requestHeader = new 
GetMaxOffsetRequestHeader();
-        requestHeader.setTopic(topic);
-        requestHeader.setQueueId(queueId);
-        RemotingCommand request = 
RemotingCommand.createRequestCommand(RequestCode.GET_MAX_OFFSET, requestHeader);
-
-        RemotingCommand response = 
this.remotingClient.invokeSync(MixAll.brokerVIPChannel(this.clientConfig.isVipChannelEnabled(),
 addr),
-                request, timeoutMillis);
-        assert response != null;
-        switch (response.getCode()) {
-            case ResponseCode.SUCCESS: {
-                GetMaxOffsetResponseHeader responseHeader =
-                        (GetMaxOffsetResponseHeader) 
response.decodeCommandCustomHeader(GetMaxOffsetResponseHeader.class);
-
-                return responseHeader.getOffset();
-            }
-            default:
-                break;
-        }
-
-        throw new MQBrokerException(response.getCode(), response.getRemark());
-    }
-
-
-    public List<String> getConsumerIdListByGroup(//
-                                                 final String addr, //
-                                                 final String consumerGroup, //
-                                                 final long timeoutMillis) 
throws RemotingConnectException, RemotingSendRequestException, 
RemotingTimeoutException,
-            MQBrokerException, InterruptedException {
-        GetConsumerListByGroupRequestHeader requestHeader = new 
GetConsumerListByGroupRequestHeader();
-        requestHeader.setConsumerGroup(consumerGroup);
-        RemotingCommand request = 
RemotingCommand.createRequestCommand(RequestCode.GET_CONSUMER_LIST_BY_GROUP, 
requestHeader);
-
-        RemotingCommand response = 
this.remotingClient.invokeSync(MixAll.brokerVIPChannel(this.clientConfig.isVipChannelEnabled(),
 addr),
-                request, timeoutMillis);
-        assert response != null;
-        switch (response.getCode()) {
-            case ResponseCode.SUCCESS: {
-                if (response.getBody() != null) {
-                    GetConsumerListByGroupResponseBody body =
-                            
GetConsumerListByGroupResponseBody.decode(response.getBody(), 
GetConsumerListByGroupResponseBody.class);
-                    return body.getConsumerIdList();
-                }
-            }
-            default:
-                break;
-        }
-
-        throw new MQBrokerException(response.getCode(), response.getRemark());
-    }
-
-
-    public long getMinOffset(final String addr, final String topic, final int 
queueId, final long timeoutMillis)
-            throws RemotingException, MQBrokerException, InterruptedException {
-        GetMinOffsetRequestHeader requestHeader = new 
GetMinOffsetRequestHeader();
-        requestHeader.setTopic(topic);
-        requestHeader.setQueueId(queueId);
-        RemotingCommand request = 
RemotingCommand.createRequestCommand(RequestCode.GET_MIN_OFFSET, requestHeader);
-
-        RemotingCommand response = 
this.remotingClient.invokeSync(MixAll.brokerVIPChannel(this.clientConfig.isVipChannelEnabled(),
 addr),
-                request, timeoutMillis);
-        assert response != null;
-        switch (response.getCode()) {
-            case ResponseCode.SUCCESS: {
-                GetMinOffsetResponseHeader responseHeader =
-                        (GetMinOffsetResponseHeader) 
response.decodeCommandCustomHeader(GetMinOffsetResponseHeader.class);
-
-                return responseHeader.getOffset();
-            }
-            default:
-                break;
-        }
-
-        throw new MQBrokerException(response.getCode(), response.getRemark());
-    }
-
-
-    public long getEarliestMsgStoretime(final String addr, final String topic, 
final int queueId, final long timeoutMillis)
-            throws RemotingException, MQBrokerException, InterruptedException {
-        GetEarliestMsgStoretimeRequestHeader requestHeader = new 
GetEarliestMsgStoretimeRequestHeader();
-        requestHeader.setTopic(topic);
-        requestHeader.setQueueId(queueId);
-        RemotingCommand request = 
RemotingCommand.createRequestCommand(RequestCode.GET_EARLIEST_MSG_STORETIME, 
requestHeader);
-
-        RemotingCommand response = 
this.remotingClient.invokeSync(MixAll.brokerVIPChannel(this.clientConfig.isVipChannelEnabled(),
 addr),
-                request, timeoutMillis);
-        assert response != null;
-        switch (response.getCode()) {
-            case ResponseCode.SUCCESS: {
-                GetEarliestMsgStoretimeResponseHeader responseHeader =
-                        (GetEarliestMsgStoretimeResponseHeader) 
response.decodeCommandCustomHeader(GetEarliestMsgStoretimeResponseHeader.class);
-
-                return responseHeader.getTimestamp();
-            }
-            default:
-                break;
-        }
-
-        throw new MQBrokerException(response.getCode(), response.getRemark());
-    }
-
-
-    public long queryConsumerOffset(//
-                                    final String addr, //
-                                    final QueryConsumerOffsetRequestHeader 
requestHeader, //
-                                    final long timeoutMillis//
-    ) throws RemotingException, MQBrokerException, InterruptedException {
-        RemotingCommand request = 
RemotingCommand.createRequestCommand(RequestCode.QUERY_CONSUMER_OFFSET, 
requestHeader);
-
-        RemotingCommand response = 
this.remotingClient.invokeSync(MixAll.brokerVIPChannel(this.clientConfig.isVipChannelEnabled(),
 addr),
-                request, timeoutMillis);
-        assert response != null;
-        switch (response.getCode()) {
-            case ResponseCode.SUCCESS: {
-                QueryConsumerOffsetResponseHeader responseHeader =
-                        (QueryConsumerOffsetResponseHeader) 
response.decodeCommandCustomHeader(QueryConsumerOffsetResponseHeader.class);
-
-                return responseHeader.getOffset();
-            }
-            default:
-                break;
-        }
-
-        throw new MQBrokerException(response.getCode(), response.getRemark());
-    }
-
-
-    public void updateConsumerOffset(//
-                                     final String addr, //
-                                     final UpdateConsumerOffsetRequestHeader 
requestHeader, //
-                                     final long timeoutMillis//
-    ) throws RemotingException, MQBrokerException, InterruptedException {
-        RemotingCommand request = 
RemotingCommand.createRequestCommand(RequestCode.UPDATE_CONSUMER_OFFSET, 
requestHeader);
-
-        RemotingCommand response = 
this.remotingClient.invokeSync(MixAll.brokerVIPChannel(this.clientConfig.isVipChannelEnabled(),
 addr),
-                request, timeoutMillis);
-        assert response != null;
-        switch (response.getCode()) {
-            case ResponseCode.SUCCESS: {
-                return;
-            }
-            default:
-                break;
-        }
-
-        throw new MQBrokerException(response.getCode(), response.getRemark());
-    }
-
-
-    public void updateConsumerOffsetOneway(//
-                                           final String addr, //
-                                           final 
UpdateConsumerOffsetRequestHeader requestHeader, //
-                                           final long timeoutMillis//
-    ) throws RemotingConnectException, RemotingTooMuchRequestException, 
RemotingTimeoutException, RemotingSendRequestException,
-            InterruptedException {
-        RemotingCommand request = 
RemotingCommand.createRequestCommand(RequestCode.UPDATE_CONSUMER_OFFSET, 
requestHeader);
-
-        
this.remotingClient.invokeOneway(MixAll.brokerVIPChannel(this.clientConfig.isVipChannelEnabled(),
 addr), request, timeoutMillis);
-    }
-
-
-    public void sendHearbeat(//
-                             final String addr, //
-                             final HeartbeatData heartbeatData, //
-                             final long timeoutMillis//
-    ) throws RemotingException, MQBrokerException, InterruptedException {
-        RemotingCommand request = 
RemotingCommand.createRequestCommand(RequestCode.HEART_BEAT, null);
-
-        request.setBody(heartbeatData.encode());
-        RemotingCommand response = this.remotingClient.invokeSync(addr, 
request, timeoutMillis);
-        assert response != null;
-        switch (response.getCode()) {
-            case ResponseCode.SUCCESS: {
-                return;
-            }
-            default:
-                break;
-        }
-
-        throw new MQBrokerException(response.getCode(), response.getRemark());
-    }
-
-
-    public void unregisterClient(//
-                                 final String addr, //
-                                 final String clientID, //
-                                 final String producerGroup, //
-                                 final String consumerGroup, //
-                                 final long timeoutMillis//
-    ) throws RemotingException, MQBrokerException, InterruptedException {
-        final UnregisterClientRequestHeader requestHeader = new 
UnregisterClientRequestHeader();
-        requestHeader.setClientID(clientID);
-        requestHeader.setProducerGroup(producerGroup);
-        requestHeader.setConsumerGroup(consumerGroup);
-        RemotingCommand request = 
RemotingCommand.createRequestCommand(RequestCode.UNREGISTER_CLIENT, 
requestHeader);
-
-        RemotingCommand response = this.remotingClient.invokeSync(addr, 
request, timeoutMillis);
-        assert response != null;
-        switch (response.getCode()) {
-            case ResponseCode.SUCCESS: {
-                return;
-            }
-            default:
-                break;
-        }
-
-        throw new MQBrokerException(response.getCode(), response.getRemark());
-    }
-
-
-    public void endTransactionOneway(//
-                                     final String addr, //
-                                     final EndTransactionRequestHeader 
requestHeader, //
-                                     final String remark, //
-                                     final long timeoutMillis//
-    ) throws RemotingException, MQBrokerException, InterruptedException {
-        RemotingCommand request = 
RemotingCommand.createRequestCommand(RequestCode.END_TRANSACTION, 
requestHeader);
-
-        request.setRemark(remark);
-        this.remotingClient.invokeOneway(addr, request, timeoutMillis);
-    }
-
-
-    public void queryMessage(
-            final String addr,
-            final QueryMessageRequestHeader requestHeader,
-            final long timeoutMillis,
-            final InvokeCallback invokeCallback,
-            final Boolean isUnqiueKey
-    ) throws RemotingException, MQBrokerException, InterruptedException {
-        RemotingCommand request = 
RemotingCommand.createRequestCommand(RequestCode.QUERY_MESSAGE, requestHeader);
-        request.addExtField(MixAll.UNIQUE_MSG_QUERY_FLAG, 
isUnqiueKey.toString());
-        
this.remotingClient.invokeAsync(MixAll.brokerVIPChannel(this.clientConfig.isVipChannelEnabled(),
 addr), request, timeoutMillis,
-                invokeCallback);
-    }
-
-
-    public boolean registerClient(final String addr, final HeartbeatData 
heartbeat, final long timeoutMillis)
-            throws RemotingException, InterruptedException {
-        RemotingCommand request = 
RemotingCommand.createRequestCommand(RequestCode.HEART_BEAT, null);
-
-        request.setBody(heartbeat.encode());
-        RemotingCommand response = this.remotingClient.invokeSync(addr, 
request, timeoutMillis);
-        return response.getCode() == ResponseCode.SUCCESS;
-    }
-
-
-    public void consumerSendMessageBack(
-            final String addr,
-            final MessageExt msg,
-            final String consumerGroup,
-            final int delayLevel,
-            final long timeoutMillis,
-            final int maxConsumeRetryTimes
-    ) throws RemotingException, MQBrokerException, InterruptedException {
-        ConsumerSendMsgBackRequestHeader requestHeader = new 
ConsumerSendMsgBackRequestHeader();
-        RemotingCommand request = 
RemotingCommand.createRequestCommand(RequestCode.CONSUMER_SEND_MSG_BACK, 
requestHeader);
-
-        requestHeader.setGroup(consumerGroup);
-        requestHeader.setOriginTopic(msg.getTopic());
-        requestHeader.setOffset(msg.getCommitLogOffset());
-        requestHeader.setDelayLevel(delayLevel);
-        requestHeader.setOriginMsgId(msg.getMsgId());
-        requestHeader.setMaxReconsumeTimes(maxConsumeRetryTimes);
-
-        RemotingCommand response = 
this.remotingClient.invokeSync(MixAll.brokerVIPChannel(this.clientConfig.isVipChannelEnabled(),
 addr),
-                request, timeoutMillis);
-        assert response != null;
-        switch (response.getCode()) {
-            case ResponseCode.SUCCESS: {
-                return;
-            }
-            default:
-                break;
-        }
-
-        throw new MQBrokerException(response.getCode(), response.getRemark());
-    }
-
-
-    public Set<MessageQueue> lockBatchMQ(//
-                                         final String addr, //
-                                         final LockBatchRequestBody 
requestBody, //
-                                         final long timeoutMillis) throws 
RemotingException, MQBrokerException, InterruptedException {
-        RemotingCommand request = 
RemotingCommand.createRequestCommand(RequestCode.LOCK_BATCH_MQ, null);
-
-        request.setBody(requestBody.encode());
-        RemotingCommand response = 
this.remotingClient.invokeSync(MixAll.brokerVIPChannel(this.clientConfig.isVipChannelEnabled(),
 addr),
-                request, timeoutMillis);
-        switch (response.getCode()) {
-            case ResponseCode.SUCCESS: {
-                LockBatchResponseBody responseBody = 
LockBatchResponseBody.decode(response.getBody(), LockBatchResponseBody.class);
-                Set<MessageQueue> messageQueues = 
responseBody.getLockOKMQSet();
-                return messageQueues;
-            }
-            default:
-                break;
-        }
-
-        throw new MQBrokerException(response.getCode(), response.getRemark());
-    }
-
-
-    public void unlockBatchMQ(//
-                              final String addr, //
-                              final UnlockBatchRequestBody requestBody, //
-                              final long timeoutMillis, //
-                              final boolean oneway//
-    ) throws RemotingException, MQBrokerException, InterruptedException {
-        RemotingCommand request = 
RemotingCommand.createRequestCommand(RequestCode.UNLOCK_BATCH_MQ, null);
-
-        request.setBody(requestBody.encode());
-
-        if (oneway) {
-            this.remotingClient.invokeOneway(addr, request, timeoutMillis);
-        } else {
-            RemotingCommand response = this.remotingClient
-                    
.invokeSync(MixAll.brokerVIPChannel(this.clientConfig.isVipChannelEnabled(), 
addr), request, timeoutMillis);
-            switch (response.getCode()) {
-                case ResponseCode.SUCCESS: {
-                    return;
-                }
-                default:
-                    break;
-            }
-
-            throw new MQBrokerException(response.getCode(), 
response.getRemark());
-        }
-    }
-
-
-    public TopicStatsTable getTopicStatsInfo(final String addr, final String 
topic, final long timeoutMillis) throws InterruptedException,
-            RemotingTimeoutException, RemotingSendRequestException, 
RemotingConnectException, MQBrokerException {
-        GetTopicStatsInfoRequestHeader requestHeader = new 
GetTopicStatsInfoRequestHeader();
-        requestHeader.setTopic(topic);
-
-        RemotingCommand request = 
RemotingCommand.createRequestCommand(RequestCode.GET_TOPIC_STATS_INFO, 
requestHeader);
-
-        RemotingCommand response = 
this.remotingClient.invokeSync(MixAll.brokerVIPChannel(this.clientConfig.isVipChannelEnabled(),
 addr),
-                request, timeoutMillis);
-        switch (response.getCode()) {
-            case ResponseCode.SUCCESS: {
-                TopicStatsTable topicStatsTable = 
TopicStatsTable.decode(response.getBody(), TopicStatsTable.class);
-                return topicStatsTable;
-            }
-            default:
-                break;
-        }
-
-        throw new MQBrokerException(response.getCode(), response.getRemark());
-    }
-
-
-    public ConsumeStats getConsumeStats(final String addr, final String 
consumerGroup, final long timeoutMillis)
-            throws InterruptedException, RemotingTimeoutException, 
RemotingSendRequestException, RemotingConnectException,
-            MQBrokerException {
-        return getConsumeStats(addr, consumerGroup, null, timeoutMillis);
-    }
-
-
-    public ConsumeStats getConsumeStats(final String addr, final String 
consumerGroup, final String topic, final long timeoutMillis)
-            throws InterruptedException, RemotingTimeoutException, 
RemotingSendRequestException, RemotingConnectException,
-            MQBrokerException {
-        GetConsumeStatsRequestHeader requestHeader = new 
GetConsumeStatsRequestHeader();
-        requestHeader.setConsumerGroup(consumerGroup);
-        requestHeader.setTopic(topic);
-
-        RemotingCommand request = 
RemotingCommand.createRequestCommand(RequestCode.GET_CONSUME_STATS, 
requestHeader);
-
-        RemotingCommand response = 
this.remotingClient.invokeSync(MixAll.brokerVIPChannel(this.clientConfig.isVipChannelEnabled(),
 addr),
-                request, timeoutMillis);
-        switch (response.getCode()) {
-            case ResponseCode.SUCCESS: {
-                ConsumeStats consumeStats = 
ConsumeStats.decode(response.getBody(), ConsumeStats.class);
-                return consumeStats;
-            }
-            default:
-                break;
-        }
-
-        throw new MQBrokerException(response.getCode(), response.getRemark());
-    }
-
-
-    public ProducerConnection getProducerConnectionList(final String addr, 
final String producerGroup, final long timeoutMillis)
-            throws RemotingConnectException, RemotingSendRequestException, 
RemotingTimeoutException, InterruptedException,
-            MQBrokerException {
-        GetProducerConnectionListRequestHeader requestHeader = new 
GetProducerConnectionListRequestHeader();
-        requestHeader.setProducerGroup(producerGroup);
-
-        RemotingCommand request = 
RemotingCommand.createRequestCommand(RequestCode.GET_PRODUCER_CONNECTION_LIST, 
requestHeader);
-
-        RemotingCommand response = 
this.remotingClient.invokeSync(MixAll.brokerVIPChannel(this.clientConfig.isVipChannelEnabled(),
 addr),
-                request, timeoutMillis);
-        switch (response.getCode()) {
-            case ResponseCode.SUCCESS: {
-                return ProducerConnection.decode(response.getBody(), 
ProducerConnection.class);
-            }
-            default:
-                break;
-        }
-
-        throw new MQBrokerException(response.getCode(), response.getRemark());
-    }
-
-
-    public ConsumerConnection getConsumerConnectionList(final String addr, 
final String consumerGroup, final long timeoutMillis)
-            throws RemotingConnectException, RemotingSendRequestException, 
RemotingTimeoutException, InterruptedException,
-            MQBrokerException {
-        GetConsumerConnectionListRequestHeader requestHeader = new 
GetConsumerConnectionListRequestHeader();
-        requestHeader.setConsumerGroup(consumerGroup);
-
-        RemotingCommand request = 
RemotingCommand.createRequestCommand(RequestCode.GET_CONSUMER_CONNECTION_LIST, 
requestHeader);
-
-        RemotingCommand response = 
this.remotingClient.invokeSync(MixAll.brokerVIPChannel(this.clientConfig.isVipChannelEnabled(),
 addr),
-                request, timeoutMillis);
-        switch (response.getCode()) {
-            case ResponseCode.SUCCESS: {
-                ConsumerConnection consumerConnection = 
ConsumerConnection.decode(response.getBody(), ConsumerConnection.class);
-                return consumerConnection;
-            }
-            default:
-                break;
-        }
-
-        throw new MQBrokerException(response.getCode(), response.getRemark());
-    }
-
-
-    public KVTable getBrokerRuntimeInfo(final String addr, final long 
timeoutMillis) throws RemotingConnectException,
-            RemotingSendRequestException, RemotingTimeoutException, 
InterruptedException, MQBrokerException {
-
-        RemotingCommand request = 
RemotingCommand.createRequestCommand(RequestCode.GET_BROKER_RUNTIME_INFO, null);
-
-        RemotingCommand response = 
this.remotingClient.invokeSync(MixAll.brokerVIPChannel(this.clientConfig.isVipChannelEnabled(),
 addr),
-                request, timeoutMillis);
-        switch (response.getCode()) {
-            case ResponseCode.SUCCESS: {
-                return KVTable.decode(response.getBody(), KVTable.class);
-            }
-            default:
-                break;
-        }
-
-        throw new MQBrokerException(response.getCode(), response.getRemark());
-    }
-
-
-    public void updateBrokerConfig(final String addr, final Properties 
properties, final long timeoutMillis)
-            throws RemotingConnectException, RemotingSendRequestException, 
RemotingTimeoutException, InterruptedException,
-            MQBrokerException, UnsupportedEncodingException {
-
-        RemotingCommand request = 
RemotingCommand.createRequestCommand(RequestCode.UPDATE_BROKER_CONFIG, null);
-
-        String str = MixAll.properties2String(properties);
-        if (str != null && str.length() > 0) {
-            request.setBody(str.getBytes(MixAll.DEFAULT_CHARSET));
-            RemotingCommand response = this.remotingClient
-                    
.invokeSync(MixAll.brokerVIPChannel(this.clientConfig.isVipChannelEnabled(), 
addr), request, timeoutMillis);
-            switch (response.getCode()) {
-                case ResponseCode.SUCCESS: {
-                    return;
-                }
-                default:
-                    break;
-            }
-
-            throw new MQBrokerException(response.getCode(), 
response.getRemark());
-        }
-    }
-
-
-    public Properties getBrokerConfig(final String addr, final long 
timeoutMillis)
-            throws RemotingConnectException, RemotingSendRequestException, 
RemotingTimeoutException, InterruptedException,
-            MQBrokerException, UnsupportedEncodingException {
-        RemotingCommand request = 
RemotingCommand.createRequestCommand(RequestCode.GET_BROKER_CONFIG, null);
-
-        RemotingCommand response = this.remotingClient.invokeSync(addr, 
request, timeoutMillis);
-        assert response != null;
-        switch (response.getCode()) {
-            case ResponseCode.SUCCESS: {
-                return MixAll.string2Properties(new String(response.getBody(), 
MixAll.DEFAULT_CHARSET));
-            }
-            default:
-                break;
-        }
-
-        throw new MQBrokerException(response.getCode(), response.getRemark());
-    }
-
-    public ClusterInfo getBrokerClusterInfo(final long timeoutMillis) throws 
InterruptedException, RemotingTimeoutException,
-            RemotingSendRequestException, RemotingConnectException, 
MQBrokerException {
-        RemotingCommand request = 
RemotingCommand.createRequestCommand(RequestCode.GET_BROKER_CLUSTER_INFO, null);
-
-        RemotingCommand response = this.remotingClient.invokeSync(null, 
request, timeoutMillis);
-        assert response != null;
-        switch (response.getCode()) {
-            case ResponseCode.SUCCESS: {
-                ClusterInfo responseBody = 
ClusterInfo.decode(response.getBody(), ClusterInfo.class);
-                return responseBody;
-            }
-            default:
-                break;
-        }
-
-        throw new MQBrokerException(response.getCode(), response.getRemark());
-    }
-
-
-    public TopicRouteData getDefaultTopicRouteInfoFromNameServer(final String 
topic, final long timeoutMillis)
-            throws RemotingException, MQClientException, InterruptedException {
-        GetRouteInfoRequestHeader requestHeader = new 
GetRouteInfoRequestHeader();
-        requestHeader.setTopic(topic);
-
-        RemotingCommand request = 
RemotingCommand.createRequestCommand(RequestCode.GET_ROUTEINTO_BY_TOPIC, 
requestHeader);
-
-        RemotingCommand response = this.remotingClient.invokeSync(null, 
request, timeoutMillis);
-        assert response != null;
-        switch (response.getCode()) {
-            case ResponseCode.TOPIC_NOT_EXIST: {
-                // TODO LOG
-                break;
-            }
-            case ResponseCode.SUCCESS: {
-                byte[] body = response.getBody();
-                if (body != null) {
-                    return TopicRouteData.decode(body, TopicRouteData.class);
-                }
-            }
-            default:
-                break;
-        }
-
-        throw new MQClientException(response.getCode(), response.getRemark());
-    }
-
-
-    public TopicRouteData getTopicRouteInfoFromNameServer(final String topic, 
final long timeoutMillis)
-            throws RemotingException, MQClientException, InterruptedException {
-        GetRouteInfoRequestHeader requestHeader = new 
GetRouteInfoRequestHeader();
-        requestHeader.setTopic(topic);
-
-        RemotingCommand request = 
RemotingCommand.createRequestCommand(RequestCode.GET_ROUTEINTO_BY_TOPIC, 
requestHeader);
-
-        RemotingCommand response = this.remotingClient.invokeSync(null, 
request, timeoutMillis);
-        assert response != null;
-        switch (response.getCode()) {
-            case ResponseCode.TOPIC_NOT_EXIST: {
-                if (!topic.equals(MixAll.DEFAULT_TOPIC))
-                    log.warn("get Topic [{}] RouteInfoFromNameServer is not 
exist value", topic);
-                break;
-            }
-            case ResponseCode.SUCCESS: {
-                byte[] body = response.getBody();
-                if (body != null) {
-                    return TopicRouteData.decode(body, TopicRouteData.class);
-                }
-            }
-            default:
-                break;
-        }
-
-        throw new MQClientException(response.getCode(), response.getRemark());
-    }
-
-
-    public TopicList getTopicListFromNameServer(final long timeoutMillis)
-            throws RemotingException, MQClientException, InterruptedException {
-        RemotingCommand request = 
RemotingCommand.createRequestCommand(RequestCode.GET_ALL_TOPIC_LIST_FROM_NAMESERVER,
 null);
-
-        RemotingCommand response = this.remotingClient.invokeSync(null, 
request, timeoutMillis);
-        assert response != null;
-        switch (response.getCode()) {
-            case ResponseCode.SUCCESS: {
-                byte[] body = response.getBody();
-                if (body != null) {
-                    TopicList topicList = TopicList.decode(body, 
TopicList.class);
-                    return topicList;
-                }
-            }
-            default:
-                break;
-        }
-
-        throw new MQClientException(response.getCode(), response.getRemark());
-    }
-
-
-    public int wipeWritePermOfBroker(final String namesrvAddr, String 
brokerName, final long timeoutMillis) throws RemotingCommandException,
-            RemotingConnectException, RemotingSendRequestException, 
RemotingTimeoutException, InterruptedException, MQClientException {
-        WipeWritePermOfBrokerRequestHeader requestHeader = new 
WipeWritePermOfBrokerRequestHeader();
-        requestHeader.setBrokerName(brokerName);
-
-        RemotingCommand request = 
RemotingCommand.createRequestCommand(RequestCode.WIPE_WRITE_PERM_OF_BROKER, 
requestHeader);
-
-        RemotingCommand response = this.remotingClient.invokeSync(namesrvAddr, 
request, timeoutMillis);
-        assert response != null;
-        switch (response.getCode()) {
-            case ResponseCode.SUCCESS: {
-                WipeWritePermOfBrokerResponseHeader responseHeader =
-                        (WipeWritePermOfBrokerResponseHeader) 
response.decodeCommandCustomHeader(WipeWritePermOfBrokerResponseHeader.class);
-                return responseHeader.getWipeTopicCount();
-            }
-            default:
-                break;
-        }
-
-        throw new MQClientException(response.getCode(), response.getRemark());
-    }
-
-
-    public void deleteTopicInBroker(final String addr, final String topic, 
final long timeoutMillis)
-            throws RemotingException, MQBrokerException, InterruptedException, 
MQClientException {
-        DeleteTopicRequestHeader requestHeader = new 
DeleteTopicRequestHeader();
-        requestHeader.setTopic(topic);
-        RemotingCommand request = 
RemotingCommand.createRequestCommand(RequestCode.DELETE_TOPIC_IN_BROKER, 
requestHeader);
-
-        RemotingCommand response = 
this.remotingClient.invokeSync(MixAll.brokerVIPChannel(this.clientConfig.isVipChannelEnabled(),
 addr),
-                request, timeoutMillis);
-        assert response != null;
-        switch (response.getCode()) {
-            case ResponseCode.SUCCESS: {
-                return;
-            }
-            default:
-                break;
-        }
-
-        throw new MQClientException(response.getCode(), response.getRemark());
-    }
-
-
-    public void deleteTopicInNameServer(final String addr, final String topic, 
final long timeoutMillis)
-            throws RemotingException, MQBrokerException, InterruptedException, 
MQClientException {
-        DeleteTopicRequestHeader requestHeader = new 
DeleteTopicRequestHeader();
-        requestHeader.setTopic(topic);
-        RemotingCommand request = 
RemotingCommand.createRequestCommand(RequestCode.DELETE_TOPIC_IN_NAMESRV, 
requestHeader);
-
-        RemotingCommand response = this.remotingClient.invokeSync(addr, 
request, timeoutMillis);
-        assert response != null;
-        switch (response.getCode()) {
-            case ResponseCode.SUCCESS: {
-                return;
-            }
-            default:
-                break;
-        }
-
-        throw new MQClientException(response.getCode(), response.getRemark());
-    }
-
-
-    public void deleteSubscriptionGroup(final String addr, final String 
groupName, final long timeoutMillis)
-            throws RemotingException, MQBrokerException, InterruptedException, 
MQClientException {
-        DeleteSubscriptionGroupRequestHeader requestHeader = new 
DeleteSubscriptionGroupRequestHeader();
-        requestHeader.setGroupName(groupName);
-        RemotingCommand request = 
RemotingCommand.createRequestCommand(RequestCode.DELETE_SUBSCRIPTIONGROUP, 
requestHeader);
-
-        RemotingCommand response = 
this.remotingClient.invokeSync(MixAll.brokerVIPChannel(this.clientConfig.isVipChannelEnabled(),
 addr),
-                request, timeoutMillis);
-        assert response != null;
-        switch (response.getCode()) {
-            case ResponseCode.SUCCESS: {
-                return;
-            }
-            default:
-                break;
-        }
-
-        throw new MQClientException(response.getCode(), response.getRemark());
-    }
-
-
-    public String getKVConfigValue(final String namespace, final String key, 
final long timeoutMillis)
-            throws RemotingException, MQClientException, InterruptedException {
-        GetKVConfigRequestHeader requestHeader = new 
GetKVConfigRequestHeader();
-        requestHeader.setNamespace(namespace);
-        requestHeader.setKey(key);
-
-        RemotingCommand request = 
RemotingCommand.createRequestCommand(RequestCode.GET_KV_CONFIG, requestHeader);
-
-        RemotingCommand response = this.remotingClient.invokeSync(null, 
request, timeoutMillis);
-        assert response != null;
-        switch (response.getCode()) {
-            case ResponseCode.SUCCESS: {
-                GetKVConfigResponseHeader responseHeader =
-                        (GetKVConfigResponseHeader) 
response.decodeCommandCustomHeader(GetKVConfigResponseHeader.class);
-                return responseHeader.getValue();
-            }
-            default:
-                break;
-        }
-
-        throw new MQClientException(response.getCode(), response.getRemark());
-    }
-
-
-    public void putKVConfigValue(final String namespace, final String key, 
final String value, final long timeoutMillis)
-            throws RemotingException, MQClientException, InterruptedException {
-        PutKVConfigRequestHeader requestHeader = new 
PutKVConfigRequestHeader();
-        requestHeader.setNamespace(namespace);
-        requestHeader.setKey(key);
-        requestHeader.setValue(value);
-
-        RemotingCommand request = 
RemotingCommand.createRequestCommand(RequestCode.PUT_KV_CONFIG, requestHeader);
-
-        List<String> nameServerAddressList = 
this.remotingClient.getNameServerAddressList();
-        if (nameServerAddressList != null) {
-            RemotingCommand errResponse = null;
-            for (String namesrvAddr : nameServerAddressList) {
-                RemotingCommand response = 
this.remotingClient.invokeSync(namesrvAddr, request, timeoutMillis);
-                assert response != null;
-                switch (response.getCode()) {
-                    case ResponseCode.SUCCESS: {
-                        break;
-                    }
-                    default:
-                        errResponse = response;
-                }
-            }
-
-            if (errResponse != null) {
-                throw new MQClientException(errResponse.getCode(), 
errResponse.getRemark());
-            }
-        }
-    }
-
-
-    public void deleteKVConfigValue(final String namespace, final String key, 
final long timeoutMillis)
-            throws RemotingException, MQClientException, InterruptedException {
-        DeleteKVConfigRequestHeader requestHeader = new 
DeleteKVConfigRequestHeader();
-        requestHeader.setNamespace(namespace);
-        requestHeader.setKey(key);
-
-        RemotingCommand request = 
RemotingCommand.createRequestCommand(RequestCode.DELETE_KV_CONFIG, 
requestHeader);
-
-        List<String> nameServerAddressList = 
this.remotingClient.getNameServerAddressList();
-        if (nameServerAddressList != null) {
-            RemotingCommand errResponse = null;
-            for (String namesrvAddr : nameServerAddressList) {
-                RemotingCommand response = 
this.remotingClient.invokeSync(namesrvAddr, request, timeoutMillis);
-                assert response != null;
-                switch (response.getCode()) {
-                    case ResponseCode.SUCCESS: {
-                        break;
-                    }
-                    default:
-                        errResponse = response;
-                }
-            }
-            if (errResponse != null) {
-                throw new MQClientException(errResponse.getCode(), 
errResponse.getRemark());
-            }
-        }
-    }
-
-
-    public KVTable getKVListByNamespace(final String namespace, final long 
timeoutMillis)
-            throws RemotingException, MQClientException, InterruptedException {
-        GetKVListByNamespaceRequestHeader requestHeader = new 
GetKVListByNamespaceRequestHeader();
-        requestHeader.setNamespace(namespace);
-
-        RemotingCommand request = 
RemotingCommand.createRequestCommand(RequestCode.GET_KVLIST_BY_NAMESPACE, 
requestHeader);
-
-        RemotingCommand response = this.remotingClient.invokeSync(null, 
request, timeoutMillis);
-        assert response != null;
-        switch (response.getCode()) {
-            case ResponseCode.SUCCESS: {
-                return KVTable.decode(response.getBody(), KVTable.class);
-            }
-            default:
-                break;
-        }
-
-        throw new MQClientException(response.getCode(), response.getRemark());
-    }
-
-
-    public Map<MessageQueue, Long> invokeBrokerToResetOffset(final String 
addr, final String topic, final String group,
-                                                             final long 
timestamp, final boolean isForce, final long timeoutMillis)
-            throws RemotingException, MQClientException, InterruptedException {
-        return invokeBrokerToResetOffset(addr, topic, group, timestamp, 
isForce, timeoutMillis, false);
-    }
-
-
-    public Map<MessageQueue, Long> invokeBrokerToResetOffset(final String 
addr, final String topic, final String group,
-                                                             final long 
timestamp, final boolean isForce, final long timeoutMillis, boolean isC)
-            throws RemotingException, MQClientException, InterruptedException {
-        ResetOffsetRequestHeader requestHeader = new 
ResetOffsetRequestHeader();
-        requestHeader.setTopic(topic);
-        requestHeader.setGroup(group);
-        requestHeader.setTimestamp(timestamp);
-        requestHeader.setForce(isForce);
-
-        RemotingCommand request = 
RemotingCommand.createRequestCommand(RequestCode.INVOKE_BROKER_TO_RESET_OFFSET, 
requestHeader);
-        if (isC) {
-            request.setLanguage(LanguageCode.CPP);
-        }
-
-        RemotingCommand response = 
this.remotingClient.invokeSync(MixAll.brokerVIPChannel(this.clientConfig.isVipChannelEnabled(),
 addr),
-                request, timeoutMillis);
-        assert response != null;
-        switch (response.getCode()) {
-            case ResponseCode.SUCCESS: {
-                if (response.getBody() != null) {
-                    ResetOffsetBody body = 
ResetOffsetBody.decode(response.getBody(), ResetOffsetBody.class);
-                    return body.getOffsetTable();
-                }
-            }
-            default:
-                break;
-        }
-
-        throw new MQClientException(response.getCode(), response.getRemark());
-    }
-
-
-    public Map<String, Map<MessageQueue, Long>> 
invokeBrokerToGetConsumerStatus(final String addr, final String topic, final 
String group,
-                                                                               
 final String clientAddr, final long timeoutMillis) throws RemotingException, 
MQClientException, InterruptedException {
-        GetConsumerStatusRequestHeader requestHeader = new 
GetConsumerStatusRequestHeader();
-        requestHeader.setTopic(topic);
-        requestHeader.setGroup(group);
-        requestHeader.setClientAddr(clientAddr);
-
-        RemotingCommand request = 
RemotingCommand.createRequestCommand(RequestCode.INVOKE_BROKER_TO_GET_CONSUMER_STATUS,
 requestHeader);
-
-        RemotingCommand response = 
this.remotingClient.invokeSync(MixAll.brokerVIPChannel(this.clientConfig.isVipChannelEnabled(),
 addr),
-                request, timeoutMillis);
-        assert response != null;
-        switch (response.getCode()) {
-            case ResponseCode.SUCCESS: {
-                if (response.getBody() != null) {
-                    GetConsumerStatusBody body = 
GetConsumerStatusBody.decode(response.getBody(), GetConsumerStatusBody.class);
-                    return body.getConsumerTable();
-                }
-            }
-            default:
-                break;
-        }
-
-        throw new MQClientException(response.getCode(), response.getRemark());
-    }
-
-
-    public GroupList queryTopicConsumeByWho(final String addr, final String 
topic, final long timeoutMillis)
-            throws RemotingConnectException, RemotingSendRequestException, 
RemotingTimeoutException, InterruptedException,
-            MQBrokerException {
-        QueryTopicConsumeByWhoRequestHeader requestHeader = new 
QueryTopicConsumeByWhoRequestHeader();
-        requestHeader.setTopic(topic);
-
-        RemotingCommand request = 
RemotingCommand.createRequestCommand(RequestCode.QUERY_TOPIC_CONSUME_BY_WHO, 
requestHeader);
-
-        RemotingCommand response = 
this.remotingClient.invokeSync(MixAll.brokerVIPChannel(this.clientConfig.isVipChannelEnabled(),
 addr),
-                request, timeoutMillis);
-        switch (response.getCode()) {
-            case ResponseCode.SUCCESS: {
-                GroupList groupList = GroupList.decode(response.getBody(), 
GroupList.class);
-                return groupList;
-            }
-            default:
-                break;
-        }
-
-        throw new MQBrokerException(response.getCode(), response.getRemark());
-    }
-
-
-    public List<QueueTimeSpan> queryConsumeTimeSpan(final String addr, final 
String topic, final String group, final long timeoutMillis)
-            throws RemotingConnectException, RemotingSendRequestException, 
RemotingTimeoutException, InterruptedException,
-            MQBrokerException {
-        QueryConsumeTimeSpanRequestHeader requestHeader = new 
QueryConsumeTimeSpanRequestHeader();
-        requestHeader.setTopic(topic);
-        requestHeader.setGroup(group);
-
-        RemotingCommand request = 
RemotingCommand.createRequestCommand(RequestCode.QUERY_CONSUME_TIME_SPAN, 
requestHeader);
-
-        RemotingCommand response = 
this.remotingClient.invokeSync(MixAll.brokerVIPChannel(this.clientConfig.isVipChannelEnabled(),
 addr),
-                request, timeoutMillis);
-        switch (response.getCode()) {
-            case ResponseCode.SUCCESS: {
-                QueryConsumeTimeSpanBody consumeTimeSpanBody = 
GroupList.decode(response.getBody(), QueryConsumeTimeSpanBody.class);
-                return consumeTimeSpanBody.getConsumeTimeSpanSet();
-            }
-            default:
-                break;
-        }
-
-        throw new MQBrokerException(response.getCode(), response.getRemark());
-    }
-
-
-    public TopicList getTopicsByCluster(final String cluster, final long 
timeoutMillis)
-            throws RemotingException, MQClientException, InterruptedException {
-        GetTopicsByClusterRequestHeader requestHeader = new 
GetTopicsByClusterRequestHeader();
-        requestHeader.setCluster(cluster);
-        RemotingCommand request = 
RemotingCommand.createRequestCommand(RequestCode.GET_TOPICS_BY_CLUSTER, 
requestHeader);
-
-        RemotingCommand response = this.remotingClient.invokeSync(null, 
request, timeoutMillis);
-        assert response != null;
-        switch (response.getCode()) {
-            case ResponseCode.SUCCESS: {
-                byte[] body = response.getBody();
-                if (body != null) {
-                    TopicList topicList = TopicList.decode(body, 
TopicList.class);
-                    return topicList;
-                }
-            }
-            default:
-                break;
-        }
-
-        throw new MQClientException(response.getCode(), response.getRemark());
-    }
-
-
-    public void registerMessageFilterClass(final String addr, //
-                                           final String consumerGroup, //
-                                           final String topic, //
-                                           final String className, //
-                                           final int classCRC, //
-                                           final byte[] classBody, //
-                                           final long timeoutMillis) throws 
RemotingConnectException, RemotingSendRequestException, 
RemotingTimeoutException,
-            InterruptedException, MQBrokerException {
-        RegisterMessageFilterClassRequestHeader requestHeader = new 
RegisterMessageFilterClassRequestHeader();
-        requestHeader.setConsumerGroup(consumerGroup);
-        requestHeader.setClassName(className);
-        requestHeader.setTopic(topic);
-        requestHeader.setClassCRC(classCRC);
-
-        RemotingCommand request = 
RemotingCommand.createRequestCommand(RequestCode.REGISTER_MESSAGE_FILTER_CLASS, 
requestHeader);
-        request.setBody(classBody);
-        RemotingCommand response = this.remotingClient.invokeSync(addr, 
request, timeoutMillis);
-        switch (response.getCode()) {
-            case ResponseCode.SUCCESS: {
-                return;
-            }
-            default:
-                break;
-        }
-
-        throw new MQBrokerException(response.getCode(), response.getRemark());
-    }
-
-
-    public TopicList getSystemTopicList(final long timeoutMillis) throws 
RemotingException, MQClientException, InterruptedException {
-        RemotingCommand request = 
RemotingCommand.createRequestCommand(RequestCode.GET_SYSTEM_TOPIC_LIST_FROM_NS, 
null);
-
-        RemotingCommand response = this.remotingClient.invokeSync(null, 
request, timeoutMillis);
-        assert response != null;
-        switch (response.getCode()) {
-            case ResponseCode.SUCCESS: {
-                byte[] body = response.getBody();
-                if (body != null) {
-                    TopicList topicList = TopicList.decode(response.getBody(), 
TopicList.class);
-                    if (topicList.getTopicList() != null && 
!topicList.getTopicList().isEmpty()
-                            && !UtilAll.isBlank(topicList.getBrokerAddr())) {
-                        TopicList tmp = 
getSystemTopicListFromBroker(topicList.getBrokerAddr(), timeoutMillis);
-                        if (tmp.getTopicList() != null && 
!tmp.getTopicList().isEmpty()) {
-                            
topicList.getTopicList().addAll(tmp.getTopicList());
-                        }
-                    }
-                    return topicList;
-                }
-            }
-            default:
-                break;
-        }
-
-        throw new MQClientException(response.getCode(), response.getRemark());
-    }
-
-
-    public TopicList getSystemTopicListFromBroker(final String addr, final 
long timeoutMillis)
-            throws RemotingException, MQClientException, InterruptedException {
-        RemotingCommand request = 
RemotingCommand.createRequestCommand(RequestCode.GET_SYSTEM_TOPIC_LIST_FROM_BROKER,
 null);
-
-        RemotingCommand response = 
this.remotingClient.invokeSync(MixAll.brokerVIPChannel(this.clientConfig.isVipChannelEnabled(),
 addr),
-                request, timeoutMillis);
-        assert response != null;
-        switch (response.getCode()) {
-            case ResponseCode.SUCCESS: {
-                byte[] body = response.getBody();
-                if (body != null) {
-                    TopicList topicList = TopicList.decode(body, 
TopicList.class);
-                    return topicList;
-                }
-            }
-            default:
-                break;
-        }
-
-        throw new MQClientException(response.getCode(), response.getRemark());
-    }
-
-
-    public boolean cleanExpiredConsumeQueue(final String addr, long 
timeoutMillis) throws MQClientException, RemotingConnectException,
-            RemotingSendRequestException, RemotingTimeoutException, 
InterruptedException {
-        RemotingCommand request = 
RemotingCommand.createRequestCommand(RequestCode.CLEAN_EXPIRED_CONSUMEQUEUE, 
null);
-        RemotingCommand response = 
this.remotingClient.invokeSync(MixAll.brokerVIPChannel(this.clientConfig.isVipChannelEnabled(),
 addr),
-                request, timeoutMillis);
-        switch (response.getCode()) {
-            case ResponseCode.SUCCESS: {
-                return true;
-            }
-            default:
-                break;
-        }
-
-        throw new MQClientException(response.getCode(), response.getRemark());
-    }
-
-
-    public boolean cleanUnusedTopicByAddr(final String addr, long 
timeoutMillis) throws MQClientException, RemotingConnectException,
-            RemotingSendRequestException, RemotingTimeoutException, 
InterruptedException {
-        RemotingCommand request = 
RemotingCommand.createRequestCommand(RequestCode.CLEAN_UNUSED_TOPIC, null);
-        RemotingCommand response = 
this.remotingClient.invokeSync(MixAll.brokerVIPChannel(this.clientConfig.isVipChannelEnabled(),
 addr),
-                request, timeoutMillis);
-        switch (response.getCode()) {
-            case ResponseCode.SUCCESS: {
-                return true;
-            }
-            default:
-                break;
-        }
-
-        throw new MQClientException(response.getCode(), response.getRemark());
-    }
-
-    public ConsumerRunningInfo getConsumerRunningInfo(final String addr, 
String consumerGroup, String clientId, boolean jstack,
-                                                      final long 
timeoutMillis) throws RemotingException, MQClientException, 
InterruptedException {
-        GetConsumerRunningInfoRequestHeader requestHeader = new 
GetConsumerRunningInfoRequestHeader();
-        requestHeader.setConsumerGroup(consumerGroup);
-        requestHeader.setClientId(clientId);
-        requestHeader.setJstackEnable(jstack);
-
-        RemotingCommand request = 
RemotingCommand.createRequestCommand(RequestCode.GET_CONSUMER_RUNNING_INFO, 
requestHeader);
-
-        RemotingCommand response = 
this.remotingClient.invokeSync(MixAll.brokerVIPChannel(this.clientConfig.isVipChannelEnabled(),
 addr),
-                request, timeoutMillis);
-        assert response != null;
-        switch (response.getCode()) {
-            case ResponseCode.SUCCESS: {
-                byte[] body = response.getBody();
-                if (body != null) {
-                    ConsumerRunningInfo info = 
ConsumerRunningInfo.decode(body, ConsumerRunningInfo.class);
-                    return info;
-                }
-            }
-            default:
-                break;
-        }
-
-        throw new MQClientException(response.getCode(), response.getRemark());
-    }
-
-    public ConsumeMessageDirectlyResult consumeMessageDirectly(final String 
addr, //
-                                                               String 
consumerGroup, //
-                                                               String 
clientId, //
-                                                               String msgId, //
-                                                               final long 
timeoutMillis) throws RemotingException, MQClientException, 
InterruptedException {
-        ConsumeMessageDirectlyResultRequestHeader requestHeader = new 
ConsumeMessageDirectlyResultRequestHeader();
-        requestHeader.setConsumerGroup(consumerGroup);
-        requestHeader.setClientId(clientId);
-        requestHeader.setMsgId(msgId);
-
-        RemotingCommand request = 
RemotingCommand.createRequestCommand(RequestCode.CONSUME_MESSAGE_DIRECTLY, 
requestHeader);
-
-        RemotingCommand response = 
this.remotingClient.invokeSync(MixAll.brokerVIPChannel(this.clientConfig.isVipChannelEnabled(),
 addr),
-                request, timeoutMillis);
-        assert response != null;
-        switch (response.getCode()) {
-            case ResponseCode.SUCCESS: {
-                byte[] body = response.getBody();
-                if (body != null) {
-                    ConsumeMessageDirectlyResult info = 
ConsumeMessageDirectlyResult.decode(body, ConsumeMessageDirectlyResult.class);
-                    return info;
-                }
-            }
-            default:
-                break;
-        }
-
-        throw new MQClientException(response.getCode(), response.getRemark());
-    }
-
-    public Map<Integer, Long> queryCorrectionOffset(final String addr, final 
String topic, final String group, Set<String> filterGroup,
-                                                    long timeoutMillis) throws 
MQClientException, RemotingConnectException, RemotingSendRequestException, 
RemotingTimeoutException,
-            InterruptedException {
-        QueryCorrectionOffsetHeader requestHeader = new 
QueryCorrectionOffsetHeader();
-        requestHeader.setCompareGroup(group);
-        requestHeader.setTopic(topic);
-        if (filterGroup != null) {
-            StringBuilder sb = new StringBuilder();
-            String splitor = "";
-            for (String s : filterGroup) {
-                sb.append(splitor).append(s);
-                splitor = ",";
-            }
-            requestHeader.setFilterGroups(sb.toString());
-        }
-        RemotingCommand request = 
RemotingCommand.createRequestCommand(RequestCode.QUERY_CORRECTION_OFFSET, 
requestHeader);
-
-        RemotingCommand response = 
this.remotingClient.invokeSync(MixAll.brokerVIPChannel(this.clientConfig.isVipChannelEnabled(),
 addr),
-                request, timeoutMillis);
-        assert response != null;
-        switch (response.getCode()) {
-            case ResponseCode.SUCCESS: {
-                if (response.getBody() != null) {
-                    QueryCorrectionOffsetBody body = 
QueryCorrectionOffsetBody.decode(response.getBody(), 
QueryCorrectionOffsetBody.class);
-                    return body.getCorrectionOffsets();
-                }
-            }
-            default:
-                break;
-        }
-
-        throw new MQClientException(response.getCode(), response.getRemark());
-    }
-
-    public TopicList getUnitTopicList(final boolean containRetry, final long 
timeoutMillis)
-            throws RemotingException, MQClientException, InterruptedException {
-        RemotingCommand request = 
RemotingCommand.createRequestCommand(RequestCode.GET_UNIT_TOPIC_LIST, null);
-
-        RemotingCommand response = this.remotingClient.invokeSync(null, 
request, timeoutMillis);
-        assert response != null;
-        switch (response.getCode()) {
-            case ResponseCode.SUCCESS: {
-                byte[] body = response.getBody();
-                if (body != null) {
-                    TopicList topicList = TopicList.decode(response.getBody(), 
TopicList.class);
-                    if (!containRetry) {
-                        Iterator<String> it = 
topicList.getTopicList().iterator();
-                        while (it.hasNext()) {
-                            String topic = it.next();
-                            if 
(topic.startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX))
-                                it.remove();
-                        }
-                    }
-
-                    return topicList;
-                }
-            }
-            default:
-                break;
-        }
-
-        throw new MQClientException(response.getCode(), response.getRemark());
-    }
-
-
-    public TopicList getHasUnitSubTopicList(final boolean containRetry, final 
long timeoutMillis)
-            throws RemotingException, MQClientException, InterruptedException {
-        RemotingCommand request = 
RemotingCommand.createRequestCommand(RequestCode.GET_HAS_UNIT_SUB_TOPIC_LIST, 
null);
-
-        RemotingCommand response = this.remotingClient.invokeSync(null, 
request, timeoutMillis);
-        assert response != null;
-        switch (response.getCode()) {
-            case ResponseCode.SUCCESS: {
-                byte[] body = response.getBody();
-                if (body != null) {
-                    TopicList topicList = TopicList.decode(response.getBody(), 
TopicList.class);
-                    if (!containRetry) {
-                        Iterator<String> it = 
topicList.getTopicList().iterator();
-                        while (it.hasNext()) {
-                            String topic = it.next();
-                            if 
(topic.startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX))
-                                it.remove();
-                        }
-                    }
-                    return topicList;
-                }
-            }
-            default:
-                break;
-        }
-
-        throw new MQClientException(response.getCode(), response.getRemark());
-    }
-
-
-    public TopicList getHasUnitSubUnUnitTopicList(final boolean containRetry, 
final long timeoutMillis)
-            throws RemotingException, MQClientException, InterruptedException {
-        RemotingCommand request = 
RemotingCommand.createRequestCommand(RequestCode.GET_HAS_UNIT_SUB_UNUNIT_TOPIC_LIST,
 null);
-
-        RemotingCommand response = this.remotingClient.invokeSync(null, 
request, timeoutMillis);
-        assert response != null;
-        switch (response.getCode()) {
-            case ResponseCode.SUCCESS: {
-                byte[] body = response.getBody();
-                if (body != null) {
-                    TopicList topicList = TopicList.decode(response.getBody(), 
TopicList.class);
-                    if (!containRetry) {
-                        Iterator<String> it = 
topicList.getTopicList().iterator();
-                        while (it.hasNext()) {
-                            String topic = it.next();
-                            if 
(topic.startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX))
-                                it.remove();
-                        }
-                    }
-                    return topicList;
-                }
-            }
-            default:
-                break;
-        }
-
-        throw new MQClientException(response.getCode(), response.getRemark());
-    }
-
-
-    public void cloneGroupOffset(final String addr, final String srcGroup, 
final String destGroup, final String topic,
-                                 final boolean isOffline, final long 
timeoutMillis) throws RemotingException, MQClientException, 
InterruptedException {
-        CloneGroupOffsetRequestHeader requestHeader = new 
CloneGroupOffsetRequestHeader();
-        requestHeader.setSrcGroup(srcGroup);
-        requestHeader.setDestGroup(destGroup);
-        requestHeader.setTopic(topic);
-        requestHeader.setOffline(isOffline);
-        RemotingCommand request = 
RemotingCommand.createRequestCommand(RequestCode.CLONE_GROUP_OFFSET, 
requestHeader);
-
-        RemotingCommand response = 
this.remotingClient.invokeSync(MixAll.brokerVIPChannel(this.clientConfig.isVipChannelEnabled(),
 addr),
-                request, timeoutMillis);
-        assert response != null;
-        switch (response.getCode()) {
-            case ResponseCode.SUCCESS: {
-                return;
-            }
-            default:
-                break;
-        }
-
-        throw new MQClientException(response.getCode(), response.getRemark());
-    }
-
-
-    public BrokerStatsData viewBrokerStatsData(String brokerAddr, String 
statsName, String statsKey, long timeoutMillis)
-            throws MQClientException, RemotingConnectException, 
RemotingSendRequestException, RemotingTimeoutException,
-            InterruptedException

<TRUNCATED>

Reply via email to