http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/6a45c767/rocketmq-client4cpp/src/MQClientAPIImpl.cpp
----------------------------------------------------------------------
diff --git a/rocketmq-client4cpp/src/MQClientAPIImpl.cpp 
b/rocketmq-client4cpp/src/MQClientAPIImpl.cpp
new file mode 100755
index 0000000..fa5a2b9
--- /dev/null
+++ b/rocketmq-client4cpp/src/MQClientAPIImpl.cpp
@@ -0,0 +1,1323 @@
+/**
+* Copyright (C) 2013 kangliqiang ,[email protected]
+*
+* Licensed under the Apache License, Version 2.0 (the "License");
+* you may not use this file except in compliance with the License.
+* You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+#include <assert.h>
+
+#include "MQClientAPIImpl.h"
+#include "MQClientException.h"
+#include "SocketUtil.h"
+#include "UtilAll.h"
+#include "TcpRemotingClient.h"
+#include "MQProtos.h"
+#include "PullResultExt.h"
+#include "ConsumerInvokeCallback.h"
+#include "NamesrvUtil.h"
+#include "VirtualEnvUtil.h"
+#include "ClientRemotingProcessor.h"
+#include "CommandCustomHeader.h"
+#include "TopicList.h"
+#include "ProducerInvokeCallback.h"
+#include "MessageDecoder.h"
+#include "MessageSysFlag.h"
+#include "GetConsumerListByGroupResponseBody.h"
+
+
+namespace rmq
+{
+
+
+MQClientAPIImpl::MQClientAPIImpl(ClientConfig& clientConfig,
+                                                               const 
RemoteClientConfig& remoteClientConfig,
+                                ClientRemotingProcessor* 
pClientRemotingProcessor)
+    : m_pClientRemotingProcessor(pClientRemotingProcessor)
+{
+    m_pRemotingClient = new TcpRemotingClient(remoteClientConfig);
+
+    m_pRemotingClient->registerProcessor(CHECK_TRANSACTION_STATE_VALUE, 
m_pClientRemotingProcessor);
+    m_pRemotingClient->registerProcessor(NOTIFY_CONSUMER_IDS_CHANGED_VALUE, 
m_pClientRemotingProcessor);
+    m_pRemotingClient->registerProcessor(RESET_CONSUMER_CLIENT_OFFSET_VALUE, 
m_pClientRemotingProcessor);
+    
m_pRemotingClient->registerProcessor(GET_CONSUMER_STATUS_FROM_CLIENT_VALUE, 
m_pClientRemotingProcessor);
+    m_pRemotingClient->registerProcessor(GET_CONSUMER_RUNNING_INFO_VALUE, 
m_pClientRemotingProcessor);
+    m_pRemotingClient->registerProcessor(CONSUME_MESSAGE_DIRECTLY_VALUE, 
m_pClientRemotingProcessor);
+}
+
+MQClientAPIImpl::~MQClientAPIImpl()
+{
+}
+
+std::string MQClientAPIImpl::getProjectGroupPrefix()
+{
+    return m_projectGroupPrefix;
+}
+
+std::vector<std::string> MQClientAPIImpl::getNameServerAddressList()
+{
+    return m_pRemotingClient->getNameServerAddressList();
+}
+
+TcpRemotingClient* MQClientAPIImpl::getRemotingClient()
+{
+    return m_pRemotingClient;
+}
+
+std::string MQClientAPIImpl::fetchNameServerAddr()
+{
+    try
+    {
+        std::string addrs = m_topAddressing.fetchNSAddr();
+        if (!addrs.empty())
+        {
+            if (addrs != m_nameSrvAddr)
+            {
+               RMQ_INFO("name server address changed, %s -> %s",
+                       m_nameSrvAddr.c_str(), addrs.c_str());
+                updateNameServerAddressList(addrs);
+                m_nameSrvAddr = addrs;
+                return m_nameSrvAddr;
+            }
+        }
+    }
+    catch (...)
+    {
+               RMQ_ERROR("fetchNameServerAddr Exception");
+    }
+
+    return m_nameSrvAddr;
+}
+
+void MQClientAPIImpl::updateNameServerAddressList(const std::string& addrs)
+{
+    m_nameSrvAddr = addrs;
+    std::vector<std::string> av;
+    UtilAll::Split(av, addrs, ";");
+    if (av.size() > 0)
+    {
+       m_pRemotingClient->updateNameServerAddressList(av);
+    }
+}
+
+void MQClientAPIImpl::start()
+{
+    m_pRemotingClient->start();
+
+    try
+    {
+        std::string localAddress = getLocalAddress();
+        m_projectGroupPrefix = getProjectGroupByIp(localAddress, 3000);
+    }
+    catch (std::exception e)
+    {
+    }
+}
+
+void MQClientAPIImpl::shutdown()
+{
+    m_pRemotingClient->shutdown();
+}
+
+void MQClientAPIImpl::createSubscriptionGroup(const std::string& addr,
+        SubscriptionGroupConfig config,
+        int timeoutMillis)
+{
+    //TODO
+}
+
+
+void MQClientAPIImpl::createTopic(const std::string& addr,
+                                  const std::string& defaultTopic,
+                                  TopicConfig topicConfig,
+                                  int timeoutMillis)
+{
+    std::string topicWithProjectGroup = topicConfig.getTopicName();
+    if (!UtilAll::isBlank(m_projectGroupPrefix))
+    {
+        topicWithProjectGroup =
+            VirtualEnvUtil::buildWithProjectGroup(topicConfig.getTopicName(), 
m_projectGroupPrefix);
+    }
+
+    CreateTopicRequestHeader* requestHeader = new CreateTopicRequestHeader();
+    requestHeader->topic = (topicWithProjectGroup);
+    requestHeader->defaultTopic = (defaultTopic);
+    requestHeader->readQueueNums = (topicConfig.getReadQueueNums());
+    requestHeader->writeQueueNums = (topicConfig.getWriteQueueNums());
+    requestHeader->perm = (topicConfig.getPerm());
+    requestHeader->topicFilterType = (topicConfig.getTopicFilterType());
+    requestHeader->topicSysFlag = (topicConfig.getTopicSysFlag());
+    requestHeader->order = (topicConfig.isOrder());
+
+    RemotingCommandPtr request =
+        RemotingCommand::createRequestCommand(UPDATE_AND_CREATE_TOPIC_VALUE, 
requestHeader);
+
+    RemotingCommandPtr response = m_pRemotingClient->invokeSync(addr, request, 
timeoutMillis);
+    if (response)
+    {
+        switch (response->getCode())
+        {
+            case SUCCESS_VALUE:
+            {
+                return;
+            }
+            default:
+                break;
+        }
+        THROW_MQEXCEPTION(MQClientException, response->getRemark(), 
response->getCode());
+    }
+
+    THROW_MQEXCEPTION(MQClientException, "createTopic failed", -1);
+}
+
+SendResult MQClientAPIImpl::sendMessage(const std::string& addr,
+                                        const std::string& brokerName,
+                                        Message& msg,
+                                        SendMessageRequestHeader* 
pRequestHeader,
+                                        int timeoutMillis,
+                                        CommunicationMode communicationMode,
+                                        SendCallback* pSendCallback)
+{
+    if (!UtilAll::isBlank(m_projectGroupPrefix))
+    {
+        msg.setTopic(VirtualEnvUtil::buildWithProjectGroup(msg.getTopic(), 
m_projectGroupPrefix));
+        pRequestHeader->producerGroup = 
(VirtualEnvUtil::buildWithProjectGroup(pRequestHeader->producerGroup,
+                                         m_projectGroupPrefix));
+        pRequestHeader->topic = 
(VirtualEnvUtil::buildWithProjectGroup(pRequestHeader->topic,
+                                 m_projectGroupPrefix));
+    }
+
+       bool sendSmartMsg = true;
+       RemotingCommandPtr request = NULL;
+    if (sendSmartMsg)
+    {
+        SendMessageRequestHeaderV2* pRequestHeaderV2 = 
SendMessageRequestHeaderV2::createSendMessageRequestHeaderV2(pRequestHeader);
+        request = RemotingCommand::createRequestCommand(SEND_MESSAGE_V2_VALUE, 
pRequestHeaderV2);
+        delete pRequestHeader;
+    }
+    else
+    {
+        request = RemotingCommand::createRequestCommand(SEND_MESSAGE_VALUE, 
pRequestHeader);
+    }
+
+    if (msg.getCompressBody() != NULL)
+    {
+       request->setBody((char*)msg.getCompressBody(), 
msg.getCompressBodyLen(), false);
+    }
+    else
+    {
+       request->setBody((char*)msg.getBody(), msg.getBodyLen(), false);
+    }
+
+    SendResult result;
+    switch (communicationMode)
+    {
+        case ONEWAY:
+            m_pRemotingClient->invokeOneway(addr, request, timeoutMillis);
+            return result;
+        case ASYNC:
+            sendMessageAsync(addr, brokerName, msg, timeoutMillis, request, 
pSendCallback);
+            return result;
+        case SYNC:
+        {
+            SendResult* r = sendMessageSync(addr, brokerName, msg, 
timeoutMillis, request);
+            if (r)
+            {
+                result = *r;
+                delete r;
+            }
+            return result;
+        }
+        default:
+            break;
+    }
+    return result;
+}
+
+PullResult* MQClientAPIImpl::pullMessage(const std::string& addr,
+        PullMessageRequestHeader* pRequestHeader,
+        int timeoutMillis,
+        CommunicationMode communicationMode,
+        PullCallback* pPullCallback)
+{
+
+    if (!UtilAll::isBlank(m_projectGroupPrefix))
+    {
+        pRequestHeader->consumerGroup = (VirtualEnvUtil::buildWithProjectGroup(
+                                             pRequestHeader->consumerGroup, 
m_projectGroupPrefix));
+        pRequestHeader->topic = 
(VirtualEnvUtil::buildWithProjectGroup(pRequestHeader->topic,
+                                 m_projectGroupPrefix));
+    }
+
+    RemotingCommandPtr request = 
RemotingCommand::createRequestCommand(PULL_MESSAGE_VALUE, pRequestHeader);
+
+    PullResult* result = NULL;
+    switch (communicationMode)
+    {
+        case ONEWAY:
+            break;
+        case ASYNC:
+            pullMessageAsync(addr, request, timeoutMillis, pPullCallback);
+            break;
+        case SYNC:
+            result =  pullMessageSync(addr, request, timeoutMillis);
+            break;
+        default:
+            assert(false);
+            break;
+    }
+
+    return result;
+}
+
+MessageExt* MQClientAPIImpl::viewMessage(const std::string& addr,  long long 
phyoffset,  int timeoutMillis)
+{
+    ViewMessageRequestHeader* requestHeader = new ViewMessageRequestHeader();
+    requestHeader->offset = phyoffset;
+
+    RemotingCommandPtr request =
+        RemotingCommand::createRequestCommand(VIEW_MESSAGE_BY_ID_VALUE, 
requestHeader);
+
+    RemotingCommandPtr response = m_pRemotingClient->invokeSync(addr, request, 
timeoutMillis);
+    if (response)
+    {
+        switch (response->getCode())
+        {
+            case SUCCESS_VALUE:
+            {
+                if (response->getBody() != NULL)
+                {
+                       int len = 0;
+                       MessageExt* messageExt = 
MessageDecoder::decode((char*)response->getBody(),
+                               response->getBodyLen(), len);
+                    if (!UtilAll::isBlank(m_projectGroupPrefix))
+                                   {
+                                       
messageExt->setTopic(VirtualEnvUtil::clearProjectGroup(messageExt->getTopic(),
+                                               m_projectGroupPrefix));
+                                   }
+                    return messageExt;
+                }
+            }
+            default:
+                break;
+        }
+        THROW_MQEXCEPTION(MQClientException, response->getRemark(), 
response->getCode());
+    }
+
+    THROW_MQEXCEPTION(MQClientException, "viewMessage failed", -1);
+}
+
+long long MQClientAPIImpl::searchOffset(const std::string& addr,
+                                        const std::string& topic,
+                                        int queueId,
+                                        long long timestamp,
+                                        int timeoutMillis)
+{
+    std::string topicWithProjectGroup = topic;
+    if (!UtilAll::isBlank(m_projectGroupPrefix))
+    {
+        topicWithProjectGroup = VirtualEnvUtil::buildWithProjectGroup(topic, 
m_projectGroupPrefix);
+    }
+
+       SearchOffsetRequestHeader* pRequestHeader = new 
SearchOffsetRequestHeader();
+    pRequestHeader->topic = topicWithProjectGroup;
+    pRequestHeader->queueId = queueId;
+    pRequestHeader->timestamp = timestamp;
+
+    RemotingCommandPtr request =
+        
RemotingCommand::createRequestCommand(SEARCH_OFFSET_BY_TIMESTAMP_VALUE, 
pRequestHeader);
+
+    RemotingCommandPtr response = m_pRemotingClient->invokeSync(addr, request, 
timeoutMillis);
+    if (response)
+    {
+           switch (response->getCode())
+           {
+               case SUCCESS_VALUE:
+               {
+                   SearchOffsetResponseHeader* ret = 
(SearchOffsetResponseHeader*)response->getCommandCustomHeader();
+                   return ret->offset;
+               }
+               default:
+                   break;
+           }
+           //THROW_MQEXCEPTION(MQClientException, response->getRemark(), 
response->getCode());
+       }
+
+    //THROW_MQEXCEPTION(MQClientException, "searchOffset failed", -1);
+    return -1;
+}
+
+long long MQClientAPIImpl::getMaxOffset(const std::string& addr,
+                                        const std::string& topic,
+                                        int queueId,
+                                        int timeoutMillis)
+{
+    std::string topicWithProjectGroup = topic;
+    if (!UtilAll::isBlank(m_projectGroupPrefix))
+    {
+        topicWithProjectGroup = VirtualEnvUtil::buildWithProjectGroup(topic, 
m_projectGroupPrefix);
+    }
+
+       GetMaxOffsetRequestHeader* pRequestHeader = new 
GetMaxOffsetRequestHeader();
+    pRequestHeader->topic = topicWithProjectGroup;
+    pRequestHeader->queueId = queueId;
+
+    RemotingCommandPtr request =
+        RemotingCommand::createRequestCommand(GET_MAX_OFFSET_VALUE, 
pRequestHeader);
+
+    RemotingCommandPtr response = m_pRemotingClient->invokeSync(addr, request, 
timeoutMillis);
+    if (response)
+    {
+           switch (response->getCode())
+           {
+               case SUCCESS_VALUE:
+               {
+                   GetMaxOffsetResponseHeader* ret = 
(GetMaxOffsetResponseHeader*)response->getCommandCustomHeader();
+                   return ret->offset;
+               }
+               default:
+                   break;
+           }
+           //THROW_MQEXCEPTION(MQClientException, response->getRemark(), 
response->getCode());
+       }
+
+    //THROW_MQEXCEPTION(MQClientException, "getMaxOffset failed", -1);
+    return -1;
+}
+
+
+std::list<std::string> MQClientAPIImpl::getConsumerIdListByGroup(const 
std::string& addr,
+        const std::string& consumerGroup,
+        int timeoutMillis)
+{
+    std::string consumerGroupWithProjectGroup = consumerGroup;
+    if (!UtilAll::isBlank(m_projectGroupPrefix))
+    {
+        consumerGroupWithProjectGroup =
+            VirtualEnvUtil::buildWithProjectGroup(consumerGroup, 
m_projectGroupPrefix);
+    }
+
+    GetConsumerListByGroupRequestHeader* requestHeader = new 
GetConsumerListByGroupRequestHeader();
+    requestHeader->consumerGroup = consumerGroupWithProjectGroup;
+
+    RemotingCommandPtr request =
+        
RemotingCommand::createRequestCommand(GET_CONSUMER_LIST_BY_GROUP_VALUE, 
requestHeader);
+
+    RemotingCommandPtr response = m_pRemotingClient->invokeSync(addr, request, 
timeoutMillis);
+    if (response)
+    {
+        switch (response->getCode())
+        {
+            case SUCCESS_VALUE:
+            {
+                if (response->getBody() != NULL)
+                {
+                    GetConsumerListByGroupResponseBody* body =
+                        
GetConsumerListByGroupResponseBody::decode((char*)response->getBody(), 
response->getBodyLen());
+                    std::list<std::string> ret = body->getConsumerIdList();
+                    delete body;
+                    return ret;
+                }
+            }
+            default:
+                break;
+        }
+
+        THROW_MQEXCEPTION(MQClientException, response->getRemark(), 
response->getCode());
+    }
+
+    THROW_MQEXCEPTION(MQClientException, "getConsumerIdListByGroup failed", 
-1);
+}
+
+long long MQClientAPIImpl::getMinOffset(const std::string& addr,
+                                        const std::string& topic,
+                                        int queueId,
+                                        int timeoutMillis)
+{
+    std::string topicWithProjectGroup = topic;
+    if (!UtilAll::isBlank(m_projectGroupPrefix))
+    {
+        topicWithProjectGroup = VirtualEnvUtil::buildWithProjectGroup(topic, 
m_projectGroupPrefix);
+    }
+
+       GetMinOffsetRequestHeader* pRequestHeader = new 
GetMinOffsetRequestHeader();
+    pRequestHeader->topic = topicWithProjectGroup;
+    pRequestHeader->queueId = queueId;
+
+    RemotingCommandPtr request =
+        RemotingCommand::createRequestCommand(GET_MIN_OFFSET_VALUE, 
pRequestHeader);
+
+    RemotingCommandPtr response = m_pRemotingClient->invokeSync(addr, request, 
timeoutMillis);
+    if (response)
+    {
+           switch (response->getCode())
+           {
+               case SUCCESS_VALUE:
+               {
+                   GetMinOffsetResponseHeader* ret = 
(GetMinOffsetResponseHeader*)response->getCommandCustomHeader();
+                   return ret->offset;
+               }
+               default:
+                   break;
+           }
+           //THROW_MQEXCEPTION(MQClientException, response->getRemark(), 
response->getCode());
+       }
+
+    //THROW_MQEXCEPTION(MQClientException, "getMinOffset failed", -1);
+    return -1;
+}
+
+long long MQClientAPIImpl::getEarliestMsgStoretime(const std::string& addr,
+        const std::string& topic,
+        int queueId,
+        int timeoutMillis)
+{
+    std::string topicWithProjectGroup = topic;
+    if (!UtilAll::isBlank(m_projectGroupPrefix))
+    {
+        topicWithProjectGroup = VirtualEnvUtil::buildWithProjectGroup(topic, 
m_projectGroupPrefix);
+    }
+
+       GetEarliestMsgStoretimeRequestHeader* pRequestHeader = new 
GetEarliestMsgStoretimeRequestHeader();
+    pRequestHeader->topic = topicWithProjectGroup;
+    pRequestHeader->queueId = queueId;
+
+    RemotingCommandPtr request =
+        
RemotingCommand::createRequestCommand(GET_EARLIEST_MSG_STORETIME_VALUE, 
pRequestHeader);
+
+    RemotingCommandPtr response = m_pRemotingClient->invokeSync(addr, request, 
timeoutMillis);
+    if (response)
+    {
+           switch (response->getCode())
+           {
+               case SUCCESS_VALUE:
+               {
+                   GetEarliestMsgStoretimeResponseHeader* ret = 
(GetEarliestMsgStoretimeResponseHeader*)response->getCommandCustomHeader();
+                   return ret->timestamp;
+               }
+               default:
+                   break;
+           }
+           THROW_MQEXCEPTION(MQClientException, response->getRemark(), 
response->getCode());
+       }
+
+    THROW_MQEXCEPTION(MQClientException, "getEarliestMsgStoretime failed", -1);
+}
+
+long long MQClientAPIImpl::queryConsumerOffset(const std::string& addr,
+        QueryConsumerOffsetRequestHeader* pRequestHeader,
+        int timeoutMillis)
+{
+    if (!UtilAll::isBlank(m_projectGroupPrefix))
+    {
+        pRequestHeader->consumerGroup = VirtualEnvUtil::buildWithProjectGroup(
+                                            pRequestHeader->consumerGroup, 
m_projectGroupPrefix);
+        pRequestHeader->topic = 
VirtualEnvUtil::buildWithProjectGroup(pRequestHeader->topic,
+                                m_projectGroupPrefix);
+    }
+
+    RemotingCommandPtr request =
+        RemotingCommand::createRequestCommand(QUERY_CONSUMER_OFFSET_VALUE, 
pRequestHeader);
+
+    RemotingCommandPtr response = m_pRemotingClient->invokeSync(addr, request, 
timeoutMillis);
+    if (response)
+    {
+           switch (response->getCode())
+           {
+               case SUCCESS_VALUE:
+               {
+                   QueryConsumerOffsetResponseHeader* ret = 
(QueryConsumerOffsetResponseHeader*)response->getCommandCustomHeader();
+                   long long offset = ret->offset;
+                   return offset;
+               }
+               default:
+                   break;
+           }
+           THROW_MQEXCEPTION(MQBrokerException, response->getRemark(), 
response->getCode());
+       }
+
+    THROW_MQEXCEPTION(MQClientException, "queryConsumerOffset failed", -1);
+    return -1;
+}
+
+void MQClientAPIImpl::updateConsumerOffset(const std::string& addr,
+        UpdateConsumerOffsetRequestHeader* pRequestHeader,
+        int timeoutMillis)
+{
+    if (!UtilAll::isBlank(m_projectGroupPrefix))
+    {
+        pRequestHeader->consumerGroup = VirtualEnvUtil::buildWithProjectGroup(
+                                            pRequestHeader->consumerGroup, 
m_projectGroupPrefix);
+        pRequestHeader->topic = VirtualEnvUtil::buildWithProjectGroup(
+                                    pRequestHeader->topic, 
m_projectGroupPrefix);
+    }
+
+    RemotingCommandPtr request = 
RemotingCommand::createRequestCommand(UPDATE_CONSUMER_OFFSET_VALUE, 
pRequestHeader);
+
+    RemotingCommandPtr response = m_pRemotingClient->invokeSync(addr, request, 
timeoutMillis);
+    if (response)
+    {
+        switch (response->getCode())
+        {
+            case SUCCESS_VALUE:
+            {
+                return;
+            }
+            default:
+                break;
+        }
+
+        THROW_MQEXCEPTION(MQClientException, response->getRemark(), 
response->getCode());
+    }
+
+    THROW_MQEXCEPTION(MQClientException, "updateConsumerOffset failed", -1);
+}
+
+void MQClientAPIImpl::updateConsumerOffsetOneway(const std::string& addr,
+        UpdateConsumerOffsetRequestHeader* pRequestHeader,
+        int timeoutMillis)
+{
+    if (!UtilAll::isBlank(m_projectGroupPrefix))
+    {
+        pRequestHeader->consumerGroup = VirtualEnvUtil::buildWithProjectGroup(
+                                            pRequestHeader->consumerGroup, 
m_projectGroupPrefix);
+        pRequestHeader->topic = 
VirtualEnvUtil::buildWithProjectGroup(pRequestHeader->topic,
+                                m_projectGroupPrefix);
+    }
+
+    RemotingCommandPtr request =
+        RemotingCommand::createRequestCommand(UPDATE_CONSUMER_OFFSET_VALUE, 
pRequestHeader);
+
+    m_pRemotingClient->invokeOneway(addr, request, timeoutMillis);
+}
+
+void MQClientAPIImpl::sendHearbeat(const std::string& addr, HeartbeatData* 
pHeartbeatData, int timeoutMillis)
+{
+    if (!UtilAll::isBlank(m_projectGroupPrefix))
+    {
+        std::set<ConsumerData>& consumerDatas = 
pHeartbeatData->getConsumerDataSet();
+        std::set<ConsumerData>::iterator it = consumerDatas.begin();
+        for (; it != consumerDatas.end(); it++)
+        {
+            ConsumerData& consumerData = (ConsumerData&)(*it);
+            consumerData.groupName = 
VirtualEnvUtil::buildWithProjectGroup(consumerData.groupName,
+                                     m_projectGroupPrefix);
+
+            std::set<SubscriptionData>& subscriptionDatas = 
consumerData.subscriptionDataSet;
+            std::set<SubscriptionData>::iterator itsub = 
subscriptionDatas.begin();
+            for (; itsub != subscriptionDatas.end(); itsub++)
+            {
+                SubscriptionData& subscriptionData = 
(SubscriptionData&)(*itsub);
+                
subscriptionData.setTopic(VirtualEnvUtil::buildWithProjectGroup(
+                                              subscriptionData.getTopic(), 
m_projectGroupPrefix));
+            }
+        }
+
+        std::set<ProducerData>& producerDatas = 
pHeartbeatData->getProducerDataSet();
+        std::set<ProducerData>::iterator itp = producerDatas.begin();
+        for (; itp != producerDatas.end(); itp++)
+        {
+            ProducerData& producerData = (ProducerData&)(*itp);
+            producerData.groupName = 
VirtualEnvUtil::buildWithProjectGroup(producerData.groupName,
+                                     m_projectGroupPrefix);
+        }
+    }
+
+    RemotingCommandPtr request = 
RemotingCommand::createRequestCommand(HEART_BEAT_VALUE, NULL);
+
+    std::string body;
+    pHeartbeatData->encode(body);
+    request->setBody((char*)body.data(), body.length(), true);
+
+    RemotingCommandPtr response = m_pRemotingClient->invokeSync(addr, request, 
timeoutMillis);
+    if (response)
+    {
+        switch (response->getCode())
+        {
+            case SUCCESS_VALUE:
+            {
+                return;
+            }
+            default:
+                break;
+        }
+
+        THROW_MQEXCEPTION(MQClientException, response->getRemark(), 
response->getCode());
+    }
+
+    THROW_MQEXCEPTION(MQClientException, "sendHearbeat failed", -1);
+}
+
+void MQClientAPIImpl::unregisterClient(const std::string& addr,
+                                       const std::string& clientID,
+                                       const std::string& producerGroup,
+                                       const std::string& consumerGroup,
+                                       int timeoutMillis)
+{
+    std::string producerGroupWithProjectGroup = producerGroup;
+    std::string consumerGroupWithProjectGroup = consumerGroup;
+    if (!UtilAll::isBlank(m_projectGroupPrefix))
+    {
+        producerGroupWithProjectGroup =
+            VirtualEnvUtil::buildWithProjectGroup(producerGroup, 
m_projectGroupPrefix);
+        consumerGroupWithProjectGroup =
+            VirtualEnvUtil::buildWithProjectGroup(consumerGroup, 
m_projectGroupPrefix);
+    }
+
+    UnregisterClientRequestHeader* requestHeader = new 
UnregisterClientRequestHeader();
+    requestHeader->clientID = (clientID);
+    requestHeader->producerGroup = (producerGroupWithProjectGroup);
+    requestHeader->consumerGroup = (consumerGroupWithProjectGroup);
+
+    RemotingCommandPtr request =
+        RemotingCommand::createRequestCommand(UNREGISTER_CLIENT_VALUE, 
requestHeader);
+
+    RemotingCommandPtr response = m_pRemotingClient->invokeSync(addr, request, 
timeoutMillis);
+    if (response)
+    {
+        switch (response->getCode())
+        {
+            case SUCCESS_VALUE:
+                return;
+            default:
+                break;
+        }
+
+        THROW_MQEXCEPTION(MQClientException, response->getRemark(), 
response->getCode());
+    }
+
+    THROW_MQEXCEPTION(MQClientException, "unregisterClient failed", -1);
+}
+
+void MQClientAPIImpl::endTransactionOneway(const std::string& addr,
+        EndTransactionRequestHeader* pRequestHeader,
+        const std::string& remark,
+        int timeoutMillis)
+{
+    //TODO
+}
+
+void MQClientAPIImpl::queryMessage(const std::string& addr,
+                                   QueryMessageRequestHeader* pRequestHeader,
+                                   int timeoutMillis,
+                                   InvokeCallback* pInvokeCallback)
+{
+    if (!UtilAll::isBlank(m_projectGroupPrefix))
+    {
+        pRequestHeader->topic = 
VirtualEnvUtil::buildWithProjectGroup(pRequestHeader->topic,
+               m_projectGroupPrefix);
+    }
+
+    RemotingCommandPtr request =
+        RemotingCommand::createRequestCommand(QUERY_MESSAGE_VALUE, 
pRequestHeader);
+
+    m_pRemotingClient->invokeAsync(addr, request, timeoutMillis, 
pInvokeCallback);
+    return;
+}
+
+bool MQClientAPIImpl::registerClient(const std::string& addr, HeartbeatData& 
heartbeat, int timeoutMillis)
+{
+    if (!UtilAll::isBlank(m_projectGroupPrefix))
+    {
+        std::set<ConsumerData>& consumerDatas = heartbeat.getConsumerDataSet();
+        std::set<ConsumerData>::iterator it = consumerDatas.begin();
+
+        for (; it != consumerDatas.end(); it++)
+        {
+            ConsumerData& consumerData = (ConsumerData&)(*it);
+
+            consumerData.groupName = 
VirtualEnvUtil::buildWithProjectGroup(consumerData.groupName,
+                                     m_projectGroupPrefix);
+            std::set<SubscriptionData>& subscriptionDatas = 
consumerData.subscriptionDataSet;
+            std::set<SubscriptionData>::iterator itsub = 
subscriptionDatas.begin();
+
+            for (; itsub != subscriptionDatas.end(); itsub++)
+            {
+                SubscriptionData& subscriptionData = 
(SubscriptionData&)(*itsub);
+                
subscriptionData.setTopic(VirtualEnvUtil::buildWithProjectGroup(
+                                              subscriptionData.getTopic(), 
m_projectGroupPrefix));
+            }
+        }
+
+        std::set<ProducerData>& producerDatas = heartbeat.getProducerDataSet();
+        std::set<ProducerData>::iterator itp = producerDatas.begin();
+        for (; itp != producerDatas.end(); itp++)
+        {
+            ProducerData& producerData = (ProducerData&)(*itp);
+            producerData.groupName = 
VirtualEnvUtil::buildWithProjectGroup(producerData.groupName,
+                                     m_projectGroupPrefix);
+        }
+    }
+
+    RemotingCommandPtr request = 
RemotingCommand::createRequestCommand(HEART_BEAT_VALUE, NULL);
+
+    std::string body;
+    heartbeat.encode(body);
+
+    request->setBody((char*)body.data(), body.length(), true);
+
+    RemotingCommandPtr response = m_pRemotingClient->invokeSync(addr, request, 
timeoutMillis);
+    return (response && response->getCode() == SUCCESS_VALUE);
+}
+
+void MQClientAPIImpl::consumerSendMessageBack(
+               const std::string& addr,
+               MessageExt& msg,
+        const std::string& consumerGroup,
+        int delayLevel,
+        int timeoutMillis)
+{
+    std::string consumerGroupWithProjectGroup = consumerGroup;
+    if (!UtilAll::isBlank(m_projectGroupPrefix))
+    {
+        consumerGroupWithProjectGroup =
+            VirtualEnvUtil::buildWithProjectGroup(consumerGroup, 
m_projectGroupPrefix);
+        msg.setTopic(VirtualEnvUtil::buildWithProjectGroup(msg.getTopic(), 
m_projectGroupPrefix));
+    }
+
+    ConsumerSendMsgBackRequestHeader* requestHeader = new 
ConsumerSendMsgBackRequestHeader();
+    requestHeader->group = consumerGroupWithProjectGroup;
+    requestHeader->offset = msg.getCommitLogOffset();
+    requestHeader->delayLevel = delayLevel;
+
+    RemotingCommandPtr request = 
RemotingCommand::createRequestCommand(CONSUMER_SEND_MSG_BACK_VALUE, 
requestHeader);
+
+       std::string brokerAddr = addr.empty() ? 
socketAddress2IPPort(msg.getStoreHost()) : addr;
+    RemotingCommandPtr response = m_pRemotingClient->invokeSync(brokerAddr, 
request, timeoutMillis);
+    if (response)
+    {
+        switch (response->getCode())
+        {
+            case SUCCESS_VALUE:
+                return;
+                break;
+            default:
+                break;
+        }
+
+        THROW_MQEXCEPTION(MQClientException, response->getRemark(), 
response->getCode());
+    }
+
+    THROW_MQEXCEPTION(MQClientException, "consumerSendMessageBack failed", -1);
+}
+
+std::set<MessageQueue> MQClientAPIImpl::lockBatchMQ(const std::string& addr,
+        LockBatchRequestBody* pRequestBody,
+        int timeoutMillis)
+{
+    if (!UtilAll::isBlank(m_projectGroupPrefix))
+    {
+        pRequestBody->setConsumerGroup((VirtualEnvUtil::buildWithProjectGroup(
+                                            pRequestBody->getConsumerGroup(), 
m_projectGroupPrefix)));
+        std::set<MessageQueue>& messageQueues = pRequestBody->getMqSet();
+        std::set<MessageQueue>::iterator it = messageQueues.begin();
+
+        for (; it != messageQueues.end(); it++)
+        {
+            MessageQueue& messageQueue = (MessageQueue&)(*it);
+            
messageQueue.setTopic(VirtualEnvUtil::buildWithProjectGroup(messageQueue.getTopic(),
+                                  m_projectGroupPrefix));
+        }
+    }
+
+    RemotingCommandPtr request = 
RemotingCommand::createRequestCommand(LOCK_BATCH_MQ_VALUE, NULL);
+
+    std::string body;
+    pRequestBody->encode(body);
+    request->setBody((char*)body.data(), body.length(), true);
+
+    RemotingCommandPtr response = m_pRemotingClient->invokeSync(addr, request, 
timeoutMillis);
+    if (response)
+    {
+        switch (response->getCode())
+        {
+            case SUCCESS_VALUE:
+            {
+                LockBatchResponseBody* responseBody =
+                    LockBatchResponseBody::decode(response->getBody(), 
response->getBodyLen());
+                std::set<MessageQueue> messageQueues = 
responseBody->getLockOKMQSet();
+
+                if (!UtilAll::isBlank(m_projectGroupPrefix))
+                {
+                    std::set<MessageQueue>::iterator it = 
messageQueues.begin();
+
+                    for (; it != messageQueues.end(); it++)
+                    {
+                        MessageQueue& messageQueue = (MessageQueue&)(*it);
+                        
messageQueue.setTopic(VirtualEnvUtil::clearProjectGroup(messageQueue.getTopic(),
+                                              m_projectGroupPrefix));
+                    }
+                }
+                return messageQueues;
+            }
+            default:
+                break;
+        }
+
+        THROW_MQEXCEPTION(MQClientException, response->getRemark(), 
response->getCode());
+    }
+
+    THROW_MQEXCEPTION(MQClientException, "lockBatchMQ failed", -1);
+}
+
+void MQClientAPIImpl::unlockBatchMQ(const std::string& addr,
+                                    UnlockBatchRequestBody* pRequestBody,
+                                    int timeoutMillis,
+                                    bool oneway)
+{
+    if (!UtilAll::isBlank(m_projectGroupPrefix))
+    {
+        pRequestBody->setConsumerGroup((VirtualEnvUtil::buildWithProjectGroup(
+                                            pRequestBody->getConsumerGroup(), 
m_projectGroupPrefix)));
+        std::set<MessageQueue>& messageQueues = pRequestBody->getMqSet();
+        std::set<MessageQueue>::iterator it = messageQueues.begin();
+
+        for (; it != messageQueues.end(); it++)
+        {
+            MessageQueue& messageQueue = (MessageQueue&)(*it);
+            
messageQueue.setTopic(VirtualEnvUtil::buildWithProjectGroup(messageQueue.getTopic(),
+                                  m_projectGroupPrefix));
+        }
+    }
+
+    RemotingCommandPtr request = 
RemotingCommand::createRequestCommand(UNLOCK_BATCH_MQ_VALUE, NULL);
+
+    std::string body;
+    pRequestBody->encode(body);
+    request->setBody((char*)body.data(), body.length(), true);
+
+    if (oneway)
+    {
+        m_pRemotingClient->invokeOneway(addr, request, timeoutMillis);
+    }
+    else
+    {
+        RemotingCommandPtr response = m_pRemotingClient->invokeSync(addr, 
request, timeoutMillis);
+        if (response)
+        {
+            switch (response->getCode())
+            {
+                case SUCCESS_VALUE:
+                    return;
+                default:
+                    break;
+            }
+
+            THROW_MQEXCEPTION(MQClientException, response->getRemark(), 
response->getCode());
+        }
+
+        THROW_MQEXCEPTION(MQClientException, "unlockBatchMQ failed", -1);
+    }
+}
+
+TopicStatsTable MQClientAPIImpl::getTopicStatsInfo(const std::string& addr,
+        const std::string& topic,
+        int timeoutMillis)
+{
+    //TODO
+    TopicStatsTable t;
+    return t;
+}
+
+ConsumeStats MQClientAPIImpl::getConsumeStats(const std::string& addr,
+        const std::string& consumerGroup,
+        int timeoutMillis)
+{
+    //TODO
+    ConsumeStats cs;
+    return cs;
+}
+
+ProducerConnection* MQClientAPIImpl::getProducerConnectionList(const 
std::string& addr,
+        const std::string& producerGroup,
+        int timeoutMillis)
+{
+    //TODO
+    return NULL;
+}
+
+ConsumerConnection* MQClientAPIImpl::getConsumerConnectionList(const 
std::string& addr,
+        const std::string& consumerGroup,
+        int timeoutMillis)
+{
+    //TODO
+    return NULL;
+}
+
+KVTable MQClientAPIImpl::getBrokerRuntimeInfo(const std::string& addr,  int 
timeoutMillis)
+{
+    //TODO
+    KVTable kv;
+    return kv;
+}
+
+void MQClientAPIImpl::updateBrokerConfig(const std::string& addr,
+        const std::map<std::string, std::string>&  properties,
+        int timeoutMillis)
+{
+    //TODO
+}
+
+ClusterInfo* MQClientAPIImpl::getBrokerClusterInfo(int timeoutMillis)
+{
+   //TODO
+    return NULL;
+}
+
+TopicRouteData* MQClientAPIImpl::getDefaultTopicRouteInfoFromNameServer(const 
std::string& topic,
+        int timeoutMillis)
+{
+    GetRouteInfoRequestHeader* requestHeader = new GetRouteInfoRequestHeader();
+    requestHeader->topic = topic;
+
+    RemotingCommandPtr request = 
RemotingCommand::createRequestCommand(GET_ROUTEINTO_BY_TOPIC_VALUE, 
requestHeader);
+    RemotingCommandPtr response = m_pRemotingClient->invokeSync("", request, 
timeoutMillis);
+    if (response)
+    {
+        switch (response->getCode())
+        {
+            case TOPIC_NOT_EXIST_VALUE:
+            {
+                               // TODO LOG
+                break;
+            }
+            case SUCCESS_VALUE:
+            {
+                int bodyLen = response->getBodyLen();
+                const char* body = response->getBody();
+                if (body)
+                {
+                    TopicRouteData* ret = TopicRouteData::encode(body, 
bodyLen);
+                    return ret;
+                }
+            }
+            default:
+                break;
+        }
+
+        THROW_MQEXCEPTION(MQClientException, response->getRemark(), 
response->getCode());
+    }
+
+    return NULL;
+}
+
+TopicRouteData* MQClientAPIImpl::getTopicRouteInfoFromNameServer(const 
std::string& topic, int timeoutMillis)
+{
+    std::string topicWithProjectGroup = topic;
+    if (!UtilAll::isBlank(m_projectGroupPrefix))
+    {
+        topicWithProjectGroup = VirtualEnvUtil::buildWithProjectGroup(topic, 
m_projectGroupPrefix);
+    }
+
+    GetRouteInfoRequestHeader* requestHeader = new GetRouteInfoRequestHeader();
+    requestHeader->topic = topicWithProjectGroup;
+
+    RemotingCommandPtr request = 
RemotingCommand::createRequestCommand(GET_ROUTEINTO_BY_TOPIC_VALUE, 
requestHeader);
+    RemotingCommandPtr response = m_pRemotingClient->invokeSync("", request, 
timeoutMillis);
+    if (response)
+    {
+        switch (response->getCode())
+        {
+            case TOPIC_NOT_EXIST_VALUE:
+            {
+               if (topic != MixAll::DEFAULT_TOPIC)
+               {
+                       RMQ_WARN("get Topic [{%s}] RouteInfoFromNameServer is 
not exist value", topic.c_str());
+               }
+                break;
+            }
+            case SUCCESS_VALUE:
+            {
+                int bodyLen = response->getBodyLen();
+                const char* body = response->getBody();
+                if (body)
+                {
+                    TopicRouteData* ret = TopicRouteData::encode(body, 
bodyLen);
+                    return ret;
+                }
+            }
+            default:
+                break;
+        }
+
+        THROW_MQEXCEPTION(MQClientException, response->getRemark(), 
response->getCode());
+    }
+
+    return NULL;
+}
+
+TopicList* MQClientAPIImpl::getTopicListFromNameServer(int timeoutMillis)
+{
+    RemotingCommandPtr request = 
RemotingCommand::createRequestCommand(GET_ALL_TOPIC_LIST_FROM_NAMESERVER_VALUE, 
NULL);
+    RemotingCommandPtr response = m_pRemotingClient->invokeSync("", request, 
timeoutMillis);
+    if (response)
+    {
+        switch (response->getCode())
+        {
+            case SUCCESS_VALUE:
+            {
+                char* body = (char*)response->getBody();
+                if (body != NULL)
+                {
+                    TopicList* topicList = TopicList::decode(body, 
response->getBodyLen());
+
+                    if (!UtilAll::isBlank(m_projectGroupPrefix))
+                    {
+                        std::set<std::string> newTopicSet;
+
+                        const std::set<std::string>& topics = 
topicList->getTopicList();
+                        std::set<std::string>::const_iterator it = 
topics.begin();
+                        for (; it != topics.end(); it++)
+                        {
+                            std::string topic = *it;
+                            
newTopicSet.insert(VirtualEnvUtil::clearProjectGroup(topic, 
m_projectGroupPrefix));
+                        }
+
+                        topicList->setTopicList(newTopicSet);
+                    }
+
+                    return topicList;
+                }
+            }
+            default:
+                break;
+        }
+
+        THROW_MQEXCEPTION(MQClientException, response->getRemark(), 
response->getCode());
+    }
+
+    return NULL;
+}
+
+int MQClientAPIImpl::wipeWritePermOfBroker(const std::string& namesrvAddr,
+        const std::string& brokerName,
+        int timeoutMillis)
+{
+    //TODO
+    return 0;
+}
+
+void MQClientAPIImpl::deleteTopicInBroker(const std::string& addr,
+        const std::string& topic,
+        int timeoutMillis)
+{
+    //TODO
+}
+
+void MQClientAPIImpl::deleteTopicInNameServer(const std::string& addr,
+        const std::string& topic,
+        int timeoutMillis)
+{
+    //TODO
+}
+
+void MQClientAPIImpl::deleteSubscriptionGroup(const std::string& addr,
+        const std::string& groupName,
+        int timeoutMillis)
+{
+    //TODO
+}
+
+std::string MQClientAPIImpl::getKVConfigValue(const std::string& 
projectNamespace,
+        const std::string& key,
+        int timeoutMillis)
+{
+       GetKVConfigRequestHeader* pRequestHeader = new 
GetKVConfigRequestHeader();
+    pRequestHeader->namespace_ = projectNamespace;
+    pRequestHeader->key = key;
+
+    RemotingCommandPtr request =
+        RemotingCommand::createRequestCommand(GET_KV_CONFIG_VALUE, 
pRequestHeader);
+
+    RemotingCommandPtr response = m_pRemotingClient->invokeSync("", request, 
timeoutMillis);
+    if (response)
+    {
+           switch (response->getCode())
+           {
+               case SUCCESS_VALUE:
+               {
+                   GetKVConfigResponseHeader* ret = 
(GetKVConfigResponseHeader*)response->getCommandCustomHeader();
+                   return ret->value;
+               }
+               default:
+                   break;
+           }
+           THROW_MQEXCEPTION(MQClientException, response->getRemark(), 
response->getCode());
+       }
+
+    THROW_MQEXCEPTION(MQClientException, "getKVConfigValue failed", -1);
+}
+
+void MQClientAPIImpl::putKVConfigValue(const std::string& projectNamespace,
+                                       const std::string& key,
+                                       const std::string& value,
+                                       int timeoutMillis)
+{
+    //TODO
+}
+
+void MQClientAPIImpl::deleteKVConfigValue(const std::string& projectNamespace,
+        const std::string& key,
+        int timeoutMillis)
+{
+    //TODO
+}
+
+std::string MQClientAPIImpl::getProjectGroupByIp(const std::string& ip,  int 
timeoutMillis)
+{
+    return getKVConfigValue(NamesrvUtil::NAMESPACE_PROJECT_CONFIG, ip, 
timeoutMillis);
+}
+
+std::string MQClientAPIImpl::getKVConfigByValue(const std::string& 
projectNamespace,
+        const std::string& projectGroup,
+        int timeoutMillis)
+{
+    //TODO
+    return "";
+}
+
+KVTable MQClientAPIImpl::getKVListByNamespace(const std::string& 
projectNamespace,  int timeoutMillis)
+{
+    //TODO
+    return KVTable();
+}
+
+void MQClientAPIImpl::deleteKVConfigByValue(const std::string& 
projectNamespace,
+        const std::string& projectGroup,
+        int timeoutMillis)
+{
+    //TODO
+}
+
+SendResult* MQClientAPIImpl::sendMessageSync(const std::string& addr,
+        const std::string& brokerName,
+        Message& msg,
+        int timeoutMillis,
+        RemotingCommand* request)
+{
+    RemotingCommandPtr response = m_pRemotingClient->invokeSync(addr, request, 
timeoutMillis);
+    return processSendResponse(brokerName, msg.getTopic(), response);
+}
+
+void MQClientAPIImpl::sendMessageAsync(const std::string& addr,
+                                       const std::string& brokerName,
+                                       Message& msg,
+                                       int timeoutMillis,
+                                       RemotingCommand* request,
+                                       SendCallback* pSendCallback)
+{
+    ProducerInvokeCallback* callback = new 
ProducerInvokeCallback(pSendCallback, this, msg.getTopic(), brokerName);
+    m_pRemotingClient->invokeAsync(addr, request, timeoutMillis, callback);
+}
+
+SendResult* MQClientAPIImpl::processSendResponse(const std::string& brokerName,
+        const std::string& topic,
+        RemotingCommand* pResponse)
+{
+    if (pResponse == NULL)
+    {
+        return NULL;
+    }
+
+    switch (pResponse->getCode())
+    {
+        case FLUSH_DISK_TIMEOUT_VALUE:
+        case FLUSH_SLAVE_TIMEOUT_VALUE:
+        case SLAVE_NOT_AVAILABLE_VALUE:
+        {
+            // TODO LOG
+        }
+        case SUCCESS_VALUE:
+        {
+            SendStatus sendStatus = SEND_OK;
+            switch (pResponse->getCode())
+            {
+                case FLUSH_DISK_TIMEOUT_VALUE:
+                    sendStatus = FLUSH_DISK_TIMEOUT;
+                    break;
+                case FLUSH_SLAVE_TIMEOUT_VALUE:
+                    sendStatus = FLUSH_SLAVE_TIMEOUT;
+                    break;
+                case SLAVE_NOT_AVAILABLE_VALUE:
+                    sendStatus = SLAVE_NOT_AVAILABLE;
+                    break;
+                case SUCCESS_VALUE:
+                    sendStatus = SEND_OK;
+                    break;
+                default:
+                    //assert false;
+                    break;
+            }
+
+            SendMessageResponseHeader* responseHeader = 
(SendMessageResponseHeader*)pResponse->getCommandCustomHeader();
+            MessageQueue messageQueue(topic, brokerName, 
responseHeader->queueId);
+            SendResult* ret = new SendResult(sendStatus, 
responseHeader->msgId, messageQueue,
+                                             responseHeader->queueOffset, 
m_projectGroupPrefix);
+
+            return ret;
+        }
+        default:
+            break;
+    }
+
+    THROW_MQEXCEPTION(MQClientException, pResponse->getRemark(), 
pResponse->getCode());
+}
+
+void MQClientAPIImpl::pullMessageAsync(const std::string& addr,
+                                       RemotingCommand* pRequest,
+                                       int timeoutMillis,
+                                       PullCallback* pPullCallback)
+{
+    ConsumerInvokeCallback* callback = new 
ConsumerInvokeCallback(pPullCallback, this);
+    m_pRemotingClient->invokeAsync(addr, pRequest, timeoutMillis, callback);
+}
+
+PullResult* MQClientAPIImpl::processPullResponse(RemotingCommand* pResponse)
+{
+    PullStatus pullStatus = NO_NEW_MSG;
+    switch (pResponse->getCode())
+    {
+        case SUCCESS_VALUE:
+            pullStatus = FOUND;
+            break;
+        case PULL_NOT_FOUND_VALUE:
+            pullStatus = NO_NEW_MSG;
+            break;
+        case PULL_RETRY_IMMEDIATELY_VALUE:
+            pullStatus = NO_MATCHED_MSG;
+            break;
+        case PULL_OFFSET_MOVED_VALUE:
+            pullStatus = OFFSET_ILLEGAL;
+            break;
+        default:
+            THROW_MQEXCEPTION(MQBrokerException, pResponse->getRemark(), 
pResponse->getCode());
+            break;
+    }
+
+    PullMessageResponseHeader* responseHeader = (PullMessageResponseHeader*) 
pResponse->getCommandCustomHeader();
+    std::list<MessageExt*> msgFoundList;
+    return new PullResultExt(pullStatus, responseHeader->nextBeginOffset,
+                             responseHeader->minOffset, 
responseHeader->maxOffset, msgFoundList,
+                             responseHeader->suggestWhichBrokerId, 
pResponse->getBody(), pResponse->getBodyLen());
+}
+
+PullResult* MQClientAPIImpl::pullMessageSync(const std::string& addr,
+        RemotingCommand* pRequest,
+        int timeoutMillis)
+{
+    RemotingCommandPtr response = m_pRemotingClient->invokeSync(addr, 
pRequest, timeoutMillis);
+    PullResult* result = processPullResponse(response);
+
+    response->setBody(NULL, 0, false);
+    return result;
+}
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/6a45c767/rocketmq-client4cpp/src/MQClientAPIImpl.h
----------------------------------------------------------------------
diff --git a/rocketmq-client4cpp/src/MQClientAPIImpl.h 
b/rocketmq-client4cpp/src/MQClientAPIImpl.h
new file mode 100755
index 0000000..88defb5
--- /dev/null
+++ b/rocketmq-client4cpp/src/MQClientAPIImpl.h
@@ -0,0 +1,280 @@
+/**
+* Copyright (C) 2013 kangliqiang ,[email protected]
+*
+* Licensed under the Apache License, Version 2.0 (the "License");
+* you may not use this file except in compliance with the License.
+* You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+#ifndef __MQCLIENTAPIIMPL_H__
+#define __MQCLIENTAPIIMPL_H__
+
+#include <string>
+#include <map>
+#include <list>
+#include <set>
+
+#include "ClientConfig.h"
+#include "RemoteClientConfig.h"
+#include "SubscriptionGroupConfig.h"
+#include "TopicConfig.h"
+#include "ConsumeStats.h"
+#include "TopicStatsTable.h"
+#include "KVTable.h"
+#include "TopicRouteData.h"
+#include "SendResult.h"
+#include "PullResult.h"
+#include "MessageExt.h"
+#include "CommunicationMode.h"
+#include "TopAddressing.h"
+#include "HeartbeatData.h"
+#include "LockBatchBody.h"
+
+namespace rmq
+{
+class ClientConfig;
+class TcpRemotingClient;
+class QueryConsumerOffsetRequestHeader;
+class UpdateConsumerOffsetRequestHeader;
+class EndTransactionRequestHeader;
+class SendMessageRequestHeader;
+class PullMessageRequestHeader;
+class QueryMessageRequestHeader;
+class ProducerConnection;
+class ConsumerConnection;
+class ClusterInfo;
+class TopicList;
+class InvokeCallback;
+class RemotingCommand;
+class PullCallback;
+class SendCallback;
+class ClientRemotingProcessor;
+
+class MQClientAPIImpl
+{
+  public:
+      MQClientAPIImpl(ClientConfig& clientConfig,
+                                         const RemoteClientConfig& 
remoteClientConfig,
+                      ClientRemotingProcessor* pClientRemotingProcessor);
+      ~MQClientAPIImpl();
+
+      void start();
+      void shutdown();
+
+      std::string getProjectGroupPrefix();
+      std::vector<std::string> getNameServerAddressList();
+      void updateNameServerAddressList(const std::string& addrs);
+      std::string fetchNameServerAddr();
+
+      void createSubscriptionGroup(const std::string& addr,
+                                   SubscriptionGroupConfig config,
+                                   int timeoutMillis);
+
+      void createTopic(const std::string& addr,
+                       const std::string& defaultTopic,
+                       TopicConfig topicConfig,
+                       int timeoutMillis);
+
+      SendResult sendMessage(const std::string& addr,
+                             const std::string& brokerName,
+                             Message& msg,
+                             SendMessageRequestHeader* pRequestHeader,
+                             int timeoutMillis,
+                             CommunicationMode communicationMode,
+                             SendCallback* pSendCallback);
+
+      PullResult* pullMessage(const std::string& addr,
+                              PullMessageRequestHeader* pRequestHeader,
+                              int timeoutMillis,
+                              CommunicationMode communicationMode,
+                              PullCallback* pPullCallback);
+
+      MessageExt* viewMessage(const std::string& addr, long long phyoffset, 
int timeoutMillis);
+
+
+      long long searchOffset(const std::string& addr,
+                             const std::string& topic,
+                             int queueId,
+                             long long timestamp,
+                             int timeoutMillis);
+
+      long long getMaxOffset(const std::string& addr,
+                             const std::string& topic,
+                             int queueId,
+                             int timeoutMillis);
+
+      std::list<std::string> getConsumerIdListByGroup(const std::string& addr,
+              const std::string& consumerGroup,
+              int timeoutMillis);
+
+      long long getMinOffset(const std::string& addr,
+                             const std::string& topic,
+                             int queueId,
+                             int timeoutMillis);
+
+      long long getEarliestMsgStoretime(const std::string& addr,
+                                        const std::string& topic,
+                                        int queueId,
+                                        int timeoutMillis);
+
+      long long queryConsumerOffset(const std::string& addr,
+                                    QueryConsumerOffsetRequestHeader* 
pRequestHeader,
+                                    int timeoutMillis);
+
+      void updateConsumerOffset(const std::string& addr,
+                                UpdateConsumerOffsetRequestHeader* 
pRequestHeader,
+                                int timeoutMillis);
+
+      void updateConsumerOffsetOneway(const std::string& addr,
+                                      UpdateConsumerOffsetRequestHeader* 
pRequestHeader,
+                                      int timeoutMillis);
+
+      void sendHearbeat(const std::string& addr, HeartbeatData* 
pHeartbeatData, int timeoutMillis);
+
+      void unregisterClient(const std::string& addr,
+                            const std::string& clientID,
+                            const std::string& producerGroup,
+                            const std::string& consumerGroup,
+                            int timeoutMillis);
+
+      void endTransactionOneway(const std::string& addr,
+                                EndTransactionRequestHeader* pRequestHeader,
+                                const std::string& remark,
+                                int timeoutMillis);
+
+      void queryMessage(const std::string& addr,
+                        QueryMessageRequestHeader* pRequestHeader,
+                        int timeoutMillis,
+                        InvokeCallback* pInvokeCallback);
+
+      bool registerClient(const std::string& addr,
+                          HeartbeatData& heartbeat,
+                          int timeoutMillis);
+
+      void consumerSendMessageBack(const std::string& addr,
+                                                          MessageExt& msg,
+                                   const std::string& consumerGroup,
+                                   int delayLevel,
+                                   int timeoutMillis);
+
+      std::set<MessageQueue> lockBatchMQ(const std::string& addr,
+                                         LockBatchRequestBody* pRequestBody,
+                                         int timeoutMillis);
+
+      void unlockBatchMQ(const std::string& addr,
+                         UnlockBatchRequestBody* pRequestBody,
+                         int timeoutMillis,
+                         bool oneway);
+
+      TopicStatsTable getTopicStatsInfo(const std::string& addr,
+                                        const std::string& topic,
+                                        int timeoutMillis);
+
+      ConsumeStats getConsumeStats(const std::string& addr,
+                                   const std::string& consumerGroup,
+                                   int timeoutMillis);
+
+      ProducerConnection* getProducerConnectionList(const std::string& addr,
+              const std::string& producerGroup,
+              int timeoutMillis);
+
+      ConsumerConnection* getConsumerConnectionList(const std::string& addr,
+              const std::string& consumerGroup,
+              int timeoutMillis);
+
+      KVTable getBrokerRuntimeInfo(const std::string& addr,  int 
timeoutMillis);
+
+      void updateBrokerConfig(const std::string& addr,
+                              const std::map<std::string, std::string>& 
properties,
+                              int timeoutMillis);
+
+      ClusterInfo* getBrokerClusterInfo(int timeoutMillis);
+
+      TopicRouteData* getDefaultTopicRouteInfoFromNameServer(const 
std::string& topic, int timeoutMillis);
+
+      TopicRouteData* getTopicRouteInfoFromNameServer(const std::string& 
topic, int timeoutMillis);
+
+      TopicList* getTopicListFromNameServer(int timeoutMillis);
+
+      int wipeWritePermOfBroker(const std::string& namesrvAddr,
+                                const std::string& brokerName,
+                                int timeoutMillis);
+
+      void deleteTopicInBroker(const std::string& addr, const std::string& 
topic, int timeoutMillis);
+      void deleteTopicInNameServer(const std::string& addr, const std::string& 
topic, int timeoutMillis);
+      void deleteSubscriptionGroup(const std::string& addr,
+                                   const std::string& groupName,
+                                   int timeoutMillis);
+
+      std::string getKVConfigValue(const std::string& projectNamespace,
+                                   const std::string& key,
+                                   int timeoutMillis);
+
+      void putKVConfigValue(const std::string& projectNamespace,
+                            const std::string& key,
+                            const std::string& value,
+                            int timeoutMillis);
+
+      void deleteKVConfigValue(const std::string& projectNamespace, const 
std::string& key, int timeoutMillis);
+
+      std::string getProjectGroupByIp(const std::string& ip,  int 
timeoutMillis);
+
+      std::string getKVConfigByValue(const std::string& projectNamespace,
+                                     const std::string& projectGroup,
+                                     int timeoutMillis);
+
+      KVTable getKVListByNamespace(const std::string& projectNamespace,  int 
timeoutMillis);
+
+      void deleteKVConfigByValue(const std::string& projectNamespace,
+                                 const std::string& projectGroup,
+                                 int timeoutMillis);
+
+      TcpRemotingClient* getRemotingClient();
+
+      SendResult* processSendResponse(const std::string& brokerName,
+                                      const std::string& topic,
+                                      RemotingCommand* pResponse);
+
+      PullResult* processPullResponse(RemotingCommand* pResponse);
+
+  private:
+      SendResult* sendMessageSync(const std::string& addr,
+                                  const std::string& brokerName,
+                                  Message& msg,
+                                  int timeoutMillis,
+                                  RemotingCommand* request);
+
+      void sendMessageAsync(const std::string& addr,
+                            const std::string& brokerName,
+                            Message& msg,
+                            int timeoutMillis,
+                            RemotingCommand* request,
+                            SendCallback* pSendCallback);
+
+      void pullMessageAsync(const std::string& addr,
+                            RemotingCommand* pRequest,
+                            int timeoutMillis,
+                            PullCallback* pPullCallback);
+
+      PullResult* pullMessageSync(const std::string& addr,
+                                  RemotingCommand* pRequest,
+                                  int timeoutMillis);
+
+  private:
+      TcpRemotingClient* m_pRemotingClient;
+      TopAddressing m_topAddressing;
+      ClientRemotingProcessor* m_pClientRemotingProcessor;
+      std::string m_nameSrvAddr;
+      std::string m_projectGroupPrefix;
+  };
+}
+
+#endif

Reply via email to