http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/6a45c767/rocketmq-client4cpp/src/protocol/CommandCustomHeader.cpp
----------------------------------------------------------------------
diff --git a/rocketmq-client4cpp/src/protocol/CommandCustomHeader.cpp 
b/rocketmq-client4cpp/src/protocol/CommandCustomHeader.cpp
new file mode 100755
index 0000000..fb2d2a6
--- /dev/null
+++ b/rocketmq-client4cpp/src/protocol/CommandCustomHeader.cpp
@@ -0,0 +1,672 @@
+/**
+* Copyright (C) 2013 kangliqiang ,[email protected]
+*
+* Licensed under the Apache License, Version 2.0 (the "License");
+* you may not use this file except in compliance with the License.
+* You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+#include "CommandCustomHeader.h"
+
+#include <stdlib.h>
+#include <unistd.h>
+#include <arpa/inet.h>
+#include <sstream>
+#include <string>
+#include <cstdlib>
+#include "RemotingCommand.h"
+#include "MQProtos.h"
+#include "KPRUtil.h"
+#include "UtilAll.h"
+
+#include "json/json.h"
+
+namespace rmq
+{
+
+
+CommandCustomHeader* CommandCustomHeader::decode(int code, Json::Value& data, 
bool isResponseType)
+{
+       CommandCustomHeader* pCustomHeader = NULL;
+
+       try
+       {
+           if (isResponseType)
+           {
+               switch (code)
+               {
+                   case SEND_MESSAGE_VALUE:
+                   case SEND_MESSAGE_V2_VALUE:
+                       pCustomHeader = SendMessageResponseHeader::decode(data);
+                       break;
+                   case PULL_MESSAGE_VALUE:
+                       pCustomHeader = PullMessageResponseHeader::decode(data);
+                       break;
+                   case QUERY_CONSUMER_OFFSET_VALUE:
+                       pCustomHeader = 
QueryConsumerOffsetResponseHeader::decode(data);
+                       break;
+                   case SEARCH_OFFSET_BY_TIMESTAMP_VALUE:
+                       pCustomHeader = 
SearchOffsetResponseHeader::decode(data);
+                       break;
+                       case GET_MAX_OFFSET_VALUE:
+                       pCustomHeader = 
GetMaxOffsetResponseHeader::decode(data);
+                       break;
+                   case GET_MIN_OFFSET_VALUE:
+                       pCustomHeader = 
GetMinOffsetResponseHeader::decode(data);
+                       break;
+                   case GET_EARLIEST_MSG_STORETIME_VALUE:
+                       pCustomHeader = 
GetEarliestMsgStoretimeResponseHeader::decode(data);
+                       break;
+                   case QUERY_MESSAGE_VALUE:
+                       pCustomHeader = 
QueryMessageResponseHeader::decode(data);
+                       break;
+                   case GET_KV_CONFIG_VALUE:
+                       pCustomHeader = GetKVConfigResponseHeader::decode(data);
+                       break;
+
+                   default:
+                       break;
+               }
+           }
+           else
+           {
+                       switch (code)
+               {
+                   case NOTIFY_CONSUMER_IDS_CHANGED_VALUE:
+                       pCustomHeader = 
NotifyConsumerIdsChangedRequestHeader::decode(data);
+                       break;
+                   case GET_CONSUMER_RUNNING_INFO_VALUE:
+                       pCustomHeader = 
GetConsumerRunningInfoRequestHeader::decode(data);
+                       break;
+                   default:
+                       break;
+               }
+           }
+    }
+    catch(std::exception& e)
+    {
+       if (pCustomHeader != NULL)
+       {
+               delete pCustomHeader;
+               pCustomHeader = NULL;
+       }
+       RMQ_ERROR("CommandCustomHeader decode exception, %d, %d, %s, %s",
+               code, isResponseType, UtilAll::toString(data).c_str(), 
e.what());
+    }
+    catch(...)
+    {
+       if (pCustomHeader != NULL)
+       {
+               delete pCustomHeader;
+               pCustomHeader = NULL;
+       }
+       RMQ_ERROR("CommandCustomHeader decode exception, %d, %d, %s",
+               code, isResponseType, UtilAll::toString(data).c_str());
+    }
+
+    return pCustomHeader;
+}
+
+
+////////////////////////////////////////////////////////////////////////////////
+//GET_ROUTEINTO_BY_TOPIC_VALUE
+////////////////////////////////////////////////////////////////////////////////
+void GetRouteInfoRequestHeader::encode(std::string& outData)
+{
+    std::stringstream ss;
+    ss << "{"
+       << "\"topic\":\"" << topic << "\""
+       << "}";
+
+    outData = ss.str();
+}
+
+
+////////////////////////////////////////////////////////////////////////////////
+// UPDATE_AND_CREATE_TOPIC_VALUE
+////////////////////////////////////////////////////////////////////////////////
+void CreateTopicRequestHeader::encode(std::string& outData)
+{
+    std::stringstream ss;
+
+    ss << "{"
+       << "\"topic\":\"" << topic << "\","
+       << "\"defaultTopic\":\"" << defaultTopic << "\","
+          << "\"readQueueNums\":\"" << readQueueNums << "\","
+          << "\"writeQueueNums\":\"" << writeQueueNums << "\","
+          << "\"perm\":\"" << perm << "\","
+          << "\"topicFilterType\":\"" << topicFilterType << "\","
+          << "\"topicSysFlag\":\"" << topicFilterType << "\","
+          << "\"order\":\"" << topicFilterType << "\""
+       << "}";
+
+    outData = ss.str();
+}
+
+
+////////////////////////////////////////////////////////////////////////////////
+// SEND_MESSAGE_VALUE/SEND_MESSAGE_V2_VALUE
+////////////////////////////////////////////////////////////////////////////////
+void SendMessageRequestHeader::encode(std::string& outData)
+{
+    std::stringstream ss;
+
+    ss << "{"
+       << "\"producerGroup\":\"" << producerGroup << "\","
+       << "\"topic\":\"" << topic << "\","
+       << "\"defaultTopic\":\"" << defaultTopic << "\","
+       << "\"defaultTopicQueueNums\":" << defaultTopicQueueNums << ","
+       << "\"queueId\":" << queueId << ","
+       << "\"sysFlag\":" << sysFlag << ","
+       << "\"bornTimestamp\":" << bornTimestamp << ","
+       << "\"flag\":" << flag << ","
+       << "\"properties\":\"" << properties << "\","
+       << "\"reconsumeTimes\":" << reconsumeTimes
+       << "}";
+
+    outData = ss.str();
+}
+
+void SendMessageRequestHeaderV2::encode(std::string& outData)
+{
+    std::stringstream ss;
+
+    ss << "{"
+       << "\"a\":\"" << a << "\","
+       << "\"b\":\"" << b << "\","
+       << "\"c\":\"" << c << "\","
+       << "\"d\":\"" << d << "\","
+       << "\"e\":\"" << e << "\","
+       << "\"f\":\"" << f << "\","
+       << "\"g\":\"" << g << "\","
+       << "\"h\":\"" << h << "\","
+       << "\"i\":\"" << i << "\","
+       << "\"j\":\"" << j << "\""
+       << "}";
+
+    outData = ss.str();
+}
+
+SendMessageRequestHeader* 
SendMessageRequestHeaderV2::createSendMessageRequestHeaderV1(
+       const SendMessageRequestHeaderV2* v2)
+{
+    SendMessageRequestHeader* v1 = new SendMessageRequestHeader();
+    v1->producerGroup = v2->a;
+    v1->topic = v2->b;
+    v1->defaultTopic = v2->c;
+    v1->defaultTopicQueueNums = v2->d;
+    v1->queueId = v2->e;
+    v1->sysFlag = v2->f;
+    v1->bornTimestamp = v2->g;
+    v1->flag = v2->h;
+    v1->properties = v2->i;
+    v1->reconsumeTimes = v2->j;
+
+    return v1;
+}
+
+SendMessageRequestHeaderV2* 
SendMessageRequestHeaderV2::createSendMessageRequestHeaderV2(
+       const SendMessageRequestHeader* v1)
+{
+    SendMessageRequestHeaderV2* v2 = new SendMessageRequestHeaderV2();
+    v2->a = v1->producerGroup;
+    v2->b = v1->topic;
+    v2->c = v1->defaultTopic;
+    v2->d = v1->defaultTopicQueueNums;
+    v2->e = v1->queueId;
+    v2->f = v1->sysFlag;
+    v2->g = v1->bornTimestamp;
+    v2->h = v1->flag;
+    v2->i = v1->properties;
+    v2->j = v1->reconsumeTimes;
+
+    return v2;
+}
+
+void SendMessageResponseHeader::encode(std::string& outData)
+{
+}
+
+CommandCustomHeader* SendMessageResponseHeader::decode(Json::Value& data)
+{
+    std::string msgId = data["msgId"].asString();
+    int queueId = atoi(data["queueId"].asCString());
+    long long queueOffset = KPRUtil::str2ll(data["queueOffset"].asCString());
+
+    SendMessageResponseHeader* h = new SendMessageResponseHeader();
+
+    h->msgId = msgId;
+    h->queueId = queueId;
+    h->queueOffset = queueOffset;
+
+    return h;
+}
+
+
+////////////////////////////////////////////////////////////////////////////////
+// PULL_MESSAGE_VALUE
+////////////////////////////////////////////////////////////////////////////////
+void PullMessageRequestHeader::encode(std::string& outData)
+{
+    std::stringstream ss;
+
+    ss << "{"
+       << "\"consumerGroup\":\"" << consumerGroup << "\","
+       << "\"topic\":\"" << topic << "\","
+       << "\"queueId\":\"" << queueId << "\","
+       << "\"queueOffset\":\"" << queueOffset << "\","
+       << "\"maxMsgNums\":\"" << maxMsgNums << "\","
+       << "\"sysFlag\":\"" << sysFlag << "\","
+       << "\"commitOffset\":\"" << commitOffset << "\","
+       << "\"suspendTimeoutMillis\":\"" << suspendTimeoutMillis << "\","
+       << "\"subscription\":\"" << subscription << "\","
+       << "\"subVersion\":\"" << subVersion << "\""
+       << "}";
+
+    outData = ss.str();
+}
+
+void PullMessageResponseHeader::encode(std::string& outData)
+{
+       std::stringstream ss;
+    ss  << "{"
+        << "\"suggestWhichBrokerId\":\"" << suggestWhichBrokerId << "\","
+        << "\"nextBeginOffset\":\"" << nextBeginOffset << "\","
+        << "\"minOffset\":\"" << minOffset << "\","
+        << "\"maxOffset\":\"" << maxOffset << "\""
+        << "}";
+    outData = ss.str();
+}
+
+CommandCustomHeader* PullMessageResponseHeader::decode(Json::Value& data)
+{
+    long long suggestWhichBrokerId = 
KPRUtil::str2ll(data["suggestWhichBrokerId"].asCString());
+    long long nextBeginOffset = 
KPRUtil::str2ll(data["nextBeginOffset"].asCString());
+    long long minOffset = KPRUtil::str2ll(data["minOffset"].asCString());
+    long long maxOffset = KPRUtil::str2ll(data["maxOffset"].asCString());
+
+    PullMessageResponseHeader* h = new PullMessageResponseHeader();
+    h->suggestWhichBrokerId = suggestWhichBrokerId;
+    h->nextBeginOffset = nextBeginOffset;
+    h->minOffset = minOffset;
+    h->maxOffset = maxOffset;
+
+    return h;
+}
+
+
+
+////////////////////////////////////////////////////////////////////////////////
+// GET_CONSUMER_LIST_BY_GROUP_VALUE
+////////////////////////////////////////////////////////////////////////////////
+void GetConsumerListByGroupRequestHeader::encode(std::string& outData)
+{
+    std::stringstream ss;
+
+    ss << "{"
+       << "\"consumerGroup\":\"" << consumerGroup << "\""
+       << "}";
+
+    outData = ss.str();
+}
+
+
+////////////////////////////////////////////////////////////////////////////////
+// CONSUMER_SEND_MSG_BACK_VALUE
+////////////////////////////////////////////////////////////////////////////////
+void ConsumerSendMsgBackRequestHeader::encode(std::string& outData)
+{
+    std::stringstream ss;
+
+    ss << "{"
+       << "\"offset\":\"" << offset << "\","
+       << "\"group\":\"" << group << "\","
+       << "\"delayLevel\":\"" << delayLevel << "\""
+       << "}";
+
+    outData = ss.str();
+}
+
+
+////////////////////////////////////////////////////////////////////////////////
+// QUERY_CONSUMER_OFFSET_VALUE
+////////////////////////////////////////////////////////////////////////////////
+void QueryConsumerOffsetRequestHeader::encode(std::string& outData)
+{
+    std::stringstream ss;
+    ss  << "{"
+        << "\"consumerGroup\":\"" << consumerGroup << "\","
+        << "\"topic\":\"" << topic << "\","
+        << "\"queueId\":\"" << queueId << "\""
+        << "}";
+    outData = ss.str();
+}
+
+void QueryConsumerOffsetResponseHeader::encode(std::string& outData)
+{
+       std::stringstream ss;
+    ss  << "{"
+        << "\"offset\":\"" << offset << "\""
+        << "}";
+    outData = ss.str();
+}
+
+CommandCustomHeader* QueryConsumerOffsetResponseHeader::decode(Json::Value& 
data)
+{
+       long long offset = -1;
+
+       if (data.isMember("offset"))
+       {
+       offset = KPRUtil::str2ll(data["offset"].asCString());
+    }
+
+    QueryConsumerOffsetResponseHeader* h = new 
QueryConsumerOffsetResponseHeader();
+    h->offset = offset;
+
+    return h;
+}
+
+
+////////////////////////////////////////////////////////////////////////////////
+// UPDATE_CONSUMER_OFFSET_VALUE
+////////////////////////////////////////////////////////////////////////////////
+void UpdateConsumerOffsetRequestHeader::encode(std::string& outData)
+{
+    std::stringstream ss;
+    ss  << "{"
+        << "\"consumerGroup\":\"" << consumerGroup << "\","
+        << "\"topic\":\"" << topic << "\","
+        << "\"queueId\":\"" << queueId << "\","
+        << "\"commitOffset\":\"" << commitOffset << "\""
+        << "}";
+    outData = ss.str();
+}
+
+
+////////////////////////////////////////////////////////////////////////////////
+// UNREGISTER_CLIENT_VALUE
+////////////////////////////////////////////////////////////////////////////////
+void UnregisterClientRequestHeader::encode(std::string& outData)
+{
+    std::stringstream ss;
+    ss  << "{"
+        << "\"producerGroup\":\"" << producerGroup << "\","
+        << "\"consumerGroup\":\"" << consumerGroup << "\","
+        << "\"clientID\":\"" << clientID << "\""
+        << "}";
+    outData = ss.str();
+}
+
+
+///////////////////////////////////////////////////////////////////////
+// VIEW_MESSAGE_BY_ID_VALUE
+///////////////////////////////////////////////////////////////////////
+void ViewMessageRequestHeader::encode(std::string& outData)
+{
+    std::stringstream ss;
+    ss  << "{"
+        << "\"offset\":" << offset
+        << "}";
+    outData = ss.str();
+}
+
+
+///////////////////////////////////////////////////////////////////////
+// SEARCH_OFFSET_BY_TIMESTAMP_VALUE
+///////////////////////////////////////////////////////////////////////
+void SearchOffsetRequestHeader::encode(std::string& outData)
+{
+    std::stringstream ss;
+    ss  << "{"
+        << "\"topic\":\"" << topic << "\","
+        << "\"queueId\":\"" << queueId << "\","
+        << "\"timestamp\":\"" << timestamp << "\""
+        << "}";
+    outData = ss.str();
+}
+
+void SearchOffsetResponseHeader::encode(std::string& outData)
+{
+       std::stringstream ss;
+    ss  << "{"
+        << "\"offset\":\"" << offset << "\""
+        << "}";
+    outData = ss.str();
+}
+
+CommandCustomHeader* SearchOffsetResponseHeader::decode(Json::Value& data)
+{
+    long long offset = KPRUtil::str2ll(data["offset"].asCString());
+
+    SearchOffsetResponseHeader* h = new SearchOffsetResponseHeader();
+    h->offset = offset;
+
+    return h;
+}
+
+
+///////////////////////////////////////////////////////////////////////
+// GET_MAX_OFFSET_VALUE
+///////////////////////////////////////////////////////////////////////
+void GetMaxOffsetRequestHeader::encode(std::string& outData)
+{
+    std::stringstream ss;
+    ss  << "{"
+        << "\"topic\":\"" << topic << "\","
+        << "\"queueId\":\"" << queueId << "\""
+        << "}";
+    outData = ss.str();
+}
+
+void GetMaxOffsetResponseHeader::encode(std::string& outData)
+{
+       std::stringstream ss;
+    ss  << "{"
+        << "\"offset\":\"" << offset << "\""
+        << "}";
+    outData = ss.str();
+}
+
+CommandCustomHeader* GetMaxOffsetResponseHeader::decode(Json::Value& data)
+{
+    long long offset = KPRUtil::str2ll(data["offset"].asCString());
+
+    GetMaxOffsetResponseHeader* h = new GetMaxOffsetResponseHeader();
+    h->offset = offset;
+
+    return h;
+}
+
+
+///////////////////////////////////////////////////////////////////////
+// GET_MIN_OFFSET_VALUE
+///////////////////////////////////////////////////////////////////////
+void GetMinOffsetRequestHeader::encode(std::string& outData)
+{
+    std::stringstream ss;
+    ss  << "{"
+        << "\"topic\":\"" << topic << "\","
+        << "\"queueId\":\"" << queueId << "\""
+        << "}";
+    outData = ss.str();
+}
+
+void GetMinOffsetResponseHeader::encode(std::string& outData)
+{
+       std::stringstream ss;
+    ss  << "{"
+        << "\"offset\":\"" << offset << "\""
+        << "}";
+    outData = ss.str();
+}
+
+CommandCustomHeader* GetMinOffsetResponseHeader::decode(Json::Value& data)
+{
+    long long offset = KPRUtil::str2ll(data["offset"].asCString());
+
+    GetMinOffsetResponseHeader* h = new GetMinOffsetResponseHeader();
+    h->offset = offset;
+
+    return h;
+}
+
+
+
+///////////////////////////////////////////////////////////////////////
+// GET_EARLIEST_MSG_STORETIME_VALUE
+///////////////////////////////////////////////////////////////////////
+void GetEarliestMsgStoretimeRequestHeader::encode(std::string& outData)
+{
+    std::stringstream ss;
+    ss  << "{"
+        << "\"topic\":\"" << topic << "\","
+        << "\"queueId\":\"" << queueId << "\""
+        << "}";
+    outData = ss.str();
+}
+
+void GetEarliestMsgStoretimeResponseHeader::encode(std::string& outData)
+{
+       std::stringstream ss;
+    ss  << "{"
+        << "\"timestamp\":\"" << timestamp << "\""
+        << "}";
+    outData = ss.str();
+}
+
+
+CommandCustomHeader* 
GetEarliestMsgStoretimeResponseHeader::decode(Json::Value& data)
+{
+    long long timestamp = KPRUtil::str2ll(data["timestamp"].asCString());
+
+    GetEarliestMsgStoretimeResponseHeader* h = new 
GetEarliestMsgStoretimeResponseHeader();
+    h->timestamp = timestamp;
+
+    return h;
+}
+
+
+///////////////////////////////////////////////////////////////////////
+// QUERY_MESSAGE_VALUE
+///////////////////////////////////////////////////////////////////////
+void QueryMessageRequestHeader::encode(std::string& outData)
+{
+    std::stringstream ss;
+    ss  << "{"
+        << "\"topic\":\"" << topic << "\","
+        << "\"key\":\"" << key << "\","
+        << "\"maxNum\":\"" << maxNum << "\","
+        << "\"beginTimestamp\":\"" << beginTimestamp << "\","
+        << "\"endTimestamp\":\"" << endTimestamp << "\""
+        << "}";
+    outData = ss.str();
+}
+
+void QueryMessageResponseHeader::encode(std::string& outData)
+{
+       std::stringstream ss;
+    ss  << "{"
+        << "\"indexLastUpdateTimestamp\":\"" << indexLastUpdateTimestamp << 
"\","
+        << "\"indexLastUpdatePhyoffset\":\"" << indexLastUpdatePhyoffset << 
"\""
+        << "}";
+    outData = ss.str();
+}
+
+CommandCustomHeader* QueryMessageResponseHeader::decode(Json::Value& data)
+{
+    long long indexLastUpdateTimestamp = 
KPRUtil::str2ll(data["indexLastUpdateTimestamp"].asCString());
+    long long indexLastUpdatePhyoffset = 
KPRUtil::str2ll(data["indexLastUpdatePhyoffset"].asCString());
+
+    QueryMessageResponseHeader* h = new QueryMessageResponseHeader();
+    h->indexLastUpdateTimestamp = indexLastUpdateTimestamp;
+    h->indexLastUpdatePhyoffset = indexLastUpdatePhyoffset;
+
+    return h;
+}
+
+
+///////////////////////////////////////////////////////////////////////
+// GET_KV_CONFIG_VALUE
+///////////////////////////////////////////////////////////////////////
+void GetKVConfigRequestHeader::encode(std::string& outData)
+{
+    std::stringstream ss;
+    ss  << "{"
+        << "\"namespace\":\"" << namespace_ << "\","
+        << "\"key\":\"" << key << "\""
+        << "}";
+    outData = ss.str();
+}
+
+void GetKVConfigResponseHeader::encode(std::string& outData)
+{
+       std::stringstream ss;
+    ss  << "{"
+        << "\"value\":\"" << value << "\""
+        << "}";
+    outData = ss.str();
+}
+
+CommandCustomHeader* GetKVConfigResponseHeader::decode(Json::Value& data)
+{
+    GetKVConfigResponseHeader* h = new GetKVConfigResponseHeader();
+    h->value = data["value"].asString();
+
+    return h;
+}
+
+
+///////////////////////////////////////////////////////////////////////
+// NOTIFY_CONSUMER_IDS_CHANGED_VALUE
+///////////////////////////////////////////////////////////////////////
+void NotifyConsumerIdsChangedRequestHeader::encode(std::string& outData)
+{
+    std::stringstream ss;
+    ss  << "{"
+        << "\"consumerGroup\":\"" << consumerGroup << "\""
+        << "}";
+    outData = ss.str();
+}
+
+CommandCustomHeader* 
NotifyConsumerIdsChangedRequestHeader::decode(Json::Value& data)
+{
+    NotifyConsumerIdsChangedRequestHeader* h = new 
NotifyConsumerIdsChangedRequestHeader();
+    h->consumerGroup = data["consumerGroup"].asString();
+
+    return h;
+}
+
+
+///////////////////////////////////////////////////////////////////////
+// GET_CONSUMER_RUNNING_INFO_VALUE
+///////////////////////////////////////////////////////////////////////
+void GetConsumerRunningInfoRequestHeader::encode(std::string& outData)
+{
+    std::stringstream ss;
+    ss  << "{"
+        << "\"consumerGroup\":\"" << consumerGroup << "\","
+        << "\"clientId\":\"" << clientId << "\","
+        << "\"jstackEnable\":\"" << jstackEnable << "\","
+        << "}";
+    outData = ss.str();
+}
+
+CommandCustomHeader* GetConsumerRunningInfoRequestHeader::decode(Json::Value& 
data)
+{
+    GetConsumerRunningInfoRequestHeader* h = new 
GetConsumerRunningInfoRequestHeader();
+    h->consumerGroup = data["consumerGroup"].asString();
+    h->clientId = data["clientId"].asString();
+    h->jstackEnable = false;//not support
+
+    return h;
+}
+
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/6a45c767/rocketmq-client4cpp/src/protocol/CommandCustomHeader.h
----------------------------------------------------------------------
diff --git a/rocketmq-client4cpp/src/protocol/CommandCustomHeader.h 
b/rocketmq-client4cpp/src/protocol/CommandCustomHeader.h
new file mode 100755
index 0000000..93f811a
--- /dev/null
+++ b/rocketmq-client4cpp/src/protocol/CommandCustomHeader.h
@@ -0,0 +1,604 @@
+/**
+* Copyright (C) 2013 kangliqiang ,[email protected]
+*
+* Licensed under the Apache License, Version 2.0 (the "License");
+* you may not use this file except in compliance with the License.
+* You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+#ifndef __COMMANDCUSTOMHEADER_H__
+#define __COMMANDCUSTOMHEADER_H__
+
+#include <string>
+#include <json/json.h>
+
+namespace rmq
+{
+    /**
+    * RemotingCommand custom header
+    *
+    */
+    class CommandCustomHeader
+    {
+    public :
+        virtual ~CommandCustomHeader() {}
+        virtual void encode(std::string& outData) = 0;
+        static CommandCustomHeader* decode(int code, Json::Value& data, bool 
isResponseType);
+    };
+
+       ///////////////////////////////////////////////////////////////////////
+       // GET_ROUTEINTO_BY_TOPIC_VALUE
+       ///////////////////////////////////////////////////////////////////////
+    class GetRouteInfoRequestHeader : public CommandCustomHeader
+    {
+    public:
+        GetRouteInfoRequestHeader()
+               {
+               };
+        ~GetRouteInfoRequestHeader() {};
+        virtual void encode(std::string& outData);
+        static CommandCustomHeader* decode(Json::Value& data);
+
+    public:
+        std::string topic;
+    };
+
+       ///////////////////////////////////////////////////////////////////////
+       // UPDATE_AND_CREATE_TOPIC_VALUE
+       ///////////////////////////////////////////////////////////////////////
+    class CreateTopicRequestHeader : public CommandCustomHeader
+    {
+    public:
+        CreateTopicRequestHeader()
+               {
+                       readQueueNums = 0;
+                       writeQueueNums = 0;
+                       perm = 0;
+                       topicSysFlag = 0;
+                       order = false;
+               };
+        ~CreateTopicRequestHeader() {};
+        virtual void encode(std::string& outData);
+        static CommandCustomHeader* decode(Json::Value& data);
+
+    public:
+        std::string topic;
+        std::string defaultTopic;
+        int readQueueNums;
+        int writeQueueNums;
+        int perm;
+        std::string topicFilterType;
+               int topicSysFlag;
+               bool order;
+    };
+
+       ///////////////////////////////////////////////////////////////////////
+       // SEND_MESSAGE_VALUE/SEND_MESSAGE_V2_VALUE
+       ///////////////////////////////////////////////////////////////////////
+    class SendMessageRequestHeader: public CommandCustomHeader
+    {
+    public:
+        SendMessageRequestHeader()
+                       : defaultTopicQueueNums(0),queueId(0),sysFlag(0),
+                       bornTimestamp(0),flag(0),reconsumeTimes(0)
+               {
+               };
+        ~SendMessageRequestHeader() {};
+        virtual void encode(std::string& outData);
+        static CommandCustomHeader* decode(Json::Value& data);
+
+    public:
+        std::string producerGroup;
+        std::string topic;
+        std::string defaultTopic;
+        int defaultTopicQueueNums;
+        int queueId;
+        int sysFlag;
+        long long bornTimestamp;
+        int flag;
+        std::string properties;
+        int reconsumeTimes;
+    };
+
+       class SendMessageRequestHeaderV2: public CommandCustomHeader
+       {
+       public:
+               SendMessageRequestHeaderV2()
+                       : d(0),e(0),f(0),
+                       g(0),h(0),j(0)
+               {
+               };
+               ~SendMessageRequestHeaderV2() {};
+
+               virtual void encode(std::string& outData);
+               static CommandCustomHeader* decode(Json::Value& data);
+               static SendMessageRequestHeader* 
createSendMessageRequestHeaderV1(const SendMessageRequestHeaderV2* v2);
+               static SendMessageRequestHeaderV2* 
createSendMessageRequestHeaderV2(const SendMessageRequestHeader* v1);
+       public:
+               std::string a;  //producerGroup
+               std::string b;  //topic
+               std::string c;  //defaultTopic
+               int d;                  //defaultTopicQueueNums
+               int e;                  //queueId
+               int f;                  //sysFlag
+               long long g;    //bornTimestamp
+               int h;                  //flag
+               std::string i;  //properties
+               int j;                  //reconsumeTimes
+       };
+
+    class SendMessageResponseHeader: public CommandCustomHeader
+    {
+    public:
+        SendMessageResponseHeader()
+               {
+                       queueId = 0;
+                       queueOffset = 0;
+               };
+        ~SendMessageResponseHeader() {};
+        virtual void encode(std::string& outData);
+        static CommandCustomHeader* decode(Json::Value& data);
+
+    public:
+        std::string msgId;
+        int queueId;
+        long long queueOffset;
+    };
+
+
+       ///////////////////////////////////////////////////////////////////////
+       // PULL_MESSAGE_VALUE
+       ///////////////////////////////////////////////////////////////////////
+    class PullMessageRequestHeader: public CommandCustomHeader
+    {
+    public:
+        PullMessageRequestHeader()
+               {
+                       queueId = 0;
+                       queueOffset = 0;
+                       maxMsgNums = 0;
+                       sysFlag = 0;
+                       commitOffset = 0;
+                       suspendTimeoutMillis = 0;
+                       subVersion = 0;
+               };
+        ~PullMessageRequestHeader() {};
+        virtual void encode(std::string& outData);
+        static CommandCustomHeader* decode(Json::Value& data);
+
+    public:
+        std::string consumerGroup;
+        std::string topic;
+        int queueId;
+        long long queueOffset;
+        int maxMsgNums;
+        int sysFlag;
+        long long commitOffset;
+        long long suspendTimeoutMillis;
+        std::string subscription;
+        long long subVersion;
+    };
+
+    class PullMessageResponseHeader: public CommandCustomHeader
+    {
+    public:
+        PullMessageResponseHeader()
+               {
+                       suggestWhichBrokerId = 0;
+                       nextBeginOffset = 0;
+                       minOffset = 0;
+                       maxOffset = 0;
+               };
+        ~PullMessageResponseHeader() {};
+        virtual void encode(std::string& outData);
+        static CommandCustomHeader* decode(Json::Value& data);
+
+    public:
+        long long suggestWhichBrokerId;
+        long long nextBeginOffset;
+        long long minOffset;
+        long long maxOffset;
+    };
+
+       ///////////////////////////////////////////////////////////////////////
+       // GET_CONSUMER_LIST_BY_GROUP_VALUE
+       ///////////////////////////////////////////////////////////////////////
+    class GetConsumerListByGroupRequestHeader : public CommandCustomHeader
+    {
+    public:
+        GetConsumerListByGroupRequestHeader() {};
+        ~GetConsumerListByGroupRequestHeader() {};
+        virtual void encode(std::string& outData);
+        static CommandCustomHeader* decode(Json::Value& data);
+
+    public:
+        std::string consumerGroup;
+    };
+
+
+       ///////////////////////////////////////////////////////////////////////
+       // CONSUMER_SEND_MSG_BACK_VALUE
+       ///////////////////////////////////////////////////////////////////////
+    class ConsumerSendMsgBackRequestHeader : public CommandCustomHeader
+    {
+    public:
+        ConsumerSendMsgBackRequestHeader()
+               {
+                       offset = 0;
+                       delayLevel = 0;
+               };
+        ~ConsumerSendMsgBackRequestHeader() {};
+
+        virtual void encode(std::string& outData);
+        static CommandCustomHeader* decode(Json::Value& data);
+
+    public:
+        long long offset;
+        std::string group;
+        int delayLevel;
+    };
+
+
+       ///////////////////////////////////////////////////////////////////////
+       // QUERY_CONSUMER_OFFSET_VALUE
+       ///////////////////////////////////////////////////////////////////////
+    class QueryConsumerOffsetRequestHeader : public CommandCustomHeader
+    {
+    public:
+        QueryConsumerOffsetRequestHeader()
+               {
+                       queueId = 0;
+               };
+        ~QueryConsumerOffsetRequestHeader() {};
+
+        virtual void encode(std::string& outData);
+        static CommandCustomHeader* decode(Json::Value& data);
+
+    public:
+        std::string consumerGroup;
+        std::string topic;
+        int queueId;
+    };
+
+    class QueryConsumerOffsetResponseHeader : public CommandCustomHeader
+    {
+    public:
+        QueryConsumerOffsetResponseHeader()
+               {
+                       offset = 0;
+               };
+        ~QueryConsumerOffsetResponseHeader() {};
+
+        virtual void encode(std::string& outData);
+        static CommandCustomHeader* decode(Json::Value& data);
+
+    public:
+        long long offset;
+    };
+
+       ///////////////////////////////////////////////////////////////////////
+       // UPDATE_CONSUMER_OFFSET_VALUE
+       ///////////////////////////////////////////////////////////////////////
+    class UpdateConsumerOffsetRequestHeader : public CommandCustomHeader
+    {
+    public:
+        UpdateConsumerOffsetRequestHeader()
+               {
+                       queueId = 0;
+                       commitOffset = 0;
+               };
+        ~UpdateConsumerOffsetRequestHeader() {};
+
+        virtual void encode(std::string& outData);
+        static CommandCustomHeader* decode(Json::Value& data);
+
+    public:
+        std::string consumerGroup;
+        std::string topic;
+        int queueId;
+        long long commitOffset;
+    };
+
+       ///////////////////////////////////////////////////////////////////////
+       // UNREGISTER_CLIENT_VALUE
+       ///////////////////////////////////////////////////////////////////////
+    class UnregisterClientRequestHeader : public CommandCustomHeader
+    {
+    public:
+        UnregisterClientRequestHeader() {};
+        ~UnregisterClientRequestHeader() {};
+
+        virtual void encode(std::string& outData);
+        static CommandCustomHeader* decode(Json::Value& data);
+
+    public:
+        std::string clientID;
+        std::string producerGroup;
+        std::string consumerGroup;
+    };
+
+
+       ///////////////////////////////////////////////////////////////////////
+       // VIEW_MESSAGE_BY_ID_VALUE
+       ///////////////////////////////////////////////////////////////////////
+       class ViewMessageRequestHeader : public CommandCustomHeader
+    {
+    public:
+        ViewMessageRequestHeader()
+               {
+                       offset = 0;
+               };
+        ~ViewMessageRequestHeader() {};
+
+        virtual void encode(std::string& outData);
+        static CommandCustomHeader* decode(Json::Value& data);
+
+    public:
+        long long offset;
+    };
+
+       ///////////////////////////////////////////////////////////////////////
+       // SEARCH_OFFSET_BY_TIMESTAMP_VALUE
+       ///////////////////////////////////////////////////////////////////////
+       class SearchOffsetRequestHeader : public CommandCustomHeader
+    {
+    public:
+        SearchOffsetRequestHeader()
+               {
+                       queueId = 0;
+                       timestamp = 0;
+               };
+        ~SearchOffsetRequestHeader() {};
+
+        virtual void encode(std::string& outData);
+        static CommandCustomHeader* decode(Json::Value& data);
+
+    public:
+        std::string topic;
+        int queueId;
+        long long timestamp;
+    };
+
+       class SearchOffsetResponseHeader : public CommandCustomHeader
+       {
+       public:
+               SearchOffsetResponseHeader()
+               {
+                       offset = 0;
+               };
+               ~SearchOffsetResponseHeader() {};
+
+               virtual void encode(std::string& outData);
+               static CommandCustomHeader* decode(Json::Value& data);
+
+       public:
+               long long offset;
+       };
+
+       ///////////////////////////////////////////////////////////////////////
+       // GET_MAX_OFFSET_VALUE
+       ///////////////////////////////////////////////////////////////////////
+       class GetMaxOffsetRequestHeader : public CommandCustomHeader
+    {
+    public:
+        GetMaxOffsetRequestHeader()
+               {
+                       queueId = 0;
+               };
+        ~GetMaxOffsetRequestHeader() {};
+
+        virtual void encode(std::string& outData);
+        static CommandCustomHeader* decode(Json::Value& data);
+
+    public:
+        std::string topic;
+        int queueId;
+    };
+
+       class GetMaxOffsetResponseHeader : public CommandCustomHeader
+       {
+       public:
+               GetMaxOffsetResponseHeader()
+               {
+                       offset = 0;
+               };
+               ~GetMaxOffsetResponseHeader() {};
+
+               virtual void encode(std::string& outData);
+               static CommandCustomHeader* decode(Json::Value& data);
+
+       public:
+               long long offset;
+       };
+
+       ///////////////////////////////////////////////////////////////////////
+       // GET_MIN_OFFSET_VALUE
+       ///////////////////////////////////////////////////////////////////////
+       class GetMinOffsetRequestHeader : public CommandCustomHeader
+    {
+    public:
+        GetMinOffsetRequestHeader()
+               {
+                       queueId = 0;
+               };
+        ~GetMinOffsetRequestHeader() {};
+
+        virtual void encode(std::string& outData);
+        static CommandCustomHeader* decode(Json::Value& data);
+
+    public:
+        std::string topic;
+        int queueId;
+    };
+
+       class GetMinOffsetResponseHeader : public CommandCustomHeader
+       {
+       public:
+               GetMinOffsetResponseHeader()
+               {
+                       offset = 0;
+               };
+               ~GetMinOffsetResponseHeader() {};
+
+               virtual void encode(std::string& outData);
+               static CommandCustomHeader* decode(Json::Value& data);
+
+       public:
+               long long offset;
+       };
+
+
+       ///////////////////////////////////////////////////////////////////////
+       // GET_EARLIEST_MSG_STORETIME_VALUE
+       ///////////////////////////////////////////////////////////////////////
+       class GetEarliestMsgStoretimeRequestHeader : public CommandCustomHeader
+    {
+    public:
+        GetEarliestMsgStoretimeRequestHeader()
+               {
+                       queueId = 0;
+               };
+        ~GetEarliestMsgStoretimeRequestHeader() {};
+
+        virtual void encode(std::string& outData);
+        static CommandCustomHeader* decode(Json::Value& data);
+
+    public:
+        std::string topic;
+        int queueId;
+    };
+
+       class GetEarliestMsgStoretimeResponseHeader : public CommandCustomHeader
+       {
+       public:
+               GetEarliestMsgStoretimeResponseHeader()
+               {
+                       timestamp = 0;
+               };
+               ~GetEarliestMsgStoretimeResponseHeader() {};
+
+               virtual void encode(std::string& outData);
+               static CommandCustomHeader* decode(Json::Value& data);
+
+       public:
+               long long timestamp;
+       };
+
+       ///////////////////////////////////////////////////////////////////////
+       // QUERY_MESSAGE_VALUE
+       ///////////////////////////////////////////////////////////////////////
+       class QueryMessageRequestHeader : public CommandCustomHeader
+    {
+    public:
+        QueryMessageRequestHeader()
+               {
+                       maxNum = 0;
+                       beginTimestamp = 0;
+                       endTimestamp = 0;
+               };
+        ~QueryMessageRequestHeader() {};
+
+        virtual void encode(std::string& outData);
+        static CommandCustomHeader* decode(Json::Value& data);
+
+    public:
+        std::string topic;
+        std::string key;
+               int maxNum;
+               long long beginTimestamp;
+               long long endTimestamp;
+    };
+
+       class QueryMessageResponseHeader : public CommandCustomHeader
+       {
+       public:
+               QueryMessageResponseHeader()
+               {
+                       indexLastUpdateTimestamp = 0;
+                       indexLastUpdatePhyoffset = 0;
+               };
+               ~QueryMessageResponseHeader() {};
+
+               virtual void encode(std::string& outData);
+               static CommandCustomHeader* decode(Json::Value& data);
+
+       public:
+               long long indexLastUpdateTimestamp;
+               long long indexLastUpdatePhyoffset;
+       };
+
+       ///////////////////////////////////////////////////////////////////////
+       // GET_KV_CONFIG_VALUE
+       ///////////////////////////////////////////////////////////////////////
+       class GetKVConfigRequestHeader : public CommandCustomHeader
+    {
+    public:
+        GetKVConfigRequestHeader() {};
+        ~GetKVConfigRequestHeader() {};
+
+        virtual void encode(std::string& outData);
+        static CommandCustomHeader* decode(Json::Value& data);
+
+    public:
+        std::string namespace_;
+        std::string key;
+    };
+
+       class GetKVConfigResponseHeader : public CommandCustomHeader
+       {
+       public:
+               GetKVConfigResponseHeader() {};
+               ~GetKVConfigResponseHeader() {};
+
+               virtual void encode(std::string& outData);
+               static CommandCustomHeader* decode(Json::Value& data);
+
+       public:
+               std::string value;
+       };
+
+       ///////////////////////////////////////////////////////////////////////
+       // NOTIFY_CONSUMER_IDS_CHANGED_VALUE
+       ///////////////////////////////////////////////////////////////////////
+       class NotifyConsumerIdsChangedRequestHeader : public CommandCustomHeader
+    {
+    public:
+        NotifyConsumerIdsChangedRequestHeader() {};
+        ~NotifyConsumerIdsChangedRequestHeader() {};
+
+        virtual void encode(std::string& outData);
+        static CommandCustomHeader* decode(Json::Value& data);
+
+    public:
+        std::string consumerGroup;
+    };
+
+
+       ///////////////////////////////////////////////////////////////////////
+       // GET_CONSUMER_RUNNING_INFO_VALUE
+       ///////////////////////////////////////////////////////////////////////
+       class GetConsumerRunningInfoRequestHeader : public CommandCustomHeader
+    {
+    public:
+        GetConsumerRunningInfoRequestHeader() {};
+        ~GetConsumerRunningInfoRequestHeader() {};
+
+        virtual void encode(std::string& outData);
+        static CommandCustomHeader* decode(Json::Value& data);
+
+    public:
+        std::string consumerGroup;
+               std::string clientId;
+               bool jstackEnable;
+    };
+}
+
+#endif

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/6a45c767/rocketmq-client4cpp/src/protocol/ConsumerRunningInfo.cpp
----------------------------------------------------------------------
diff --git a/rocketmq-client4cpp/src/protocol/ConsumerRunningInfo.cpp 
b/rocketmq-client4cpp/src/protocol/ConsumerRunningInfo.cpp
new file mode 100755
index 0000000..58cecde
--- /dev/null
+++ b/rocketmq-client4cpp/src/protocol/ConsumerRunningInfo.cpp
@@ -0,0 +1,168 @@
+/**
+* Copyright (C) 2013 kangliqiang ,[email protected]
+*
+* Licensed under the Apache License, Version 2.0 (the "License");
+* you may not use this file except in compliance with the License.
+* You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+#include "ConsumerRunningInfo.h"
+
+namespace rmq
+{
+
+const std::string ConsumerRunningInfo::PROP_NAMESERVER_ADDR = 
"PROP_NAMESERVER_ADDR";
+const std::string ConsumerRunningInfo::PROP_THREADPOOL_CORE_SIZE = 
"PROP_THREADPOOL_CORE_SIZE";
+const std::string ConsumerRunningInfo::PROP_CONSUME_ORDERLY = 
"PROP_CONSUMEORDERLY";
+const std::string ConsumerRunningInfo::PROP_CONSUME_TYPE = "PROP_CONSUME_TYPE";
+const std::string ConsumerRunningInfo::PROP_CLIENT_VERSION = 
"PROP_CLIENT_VERSION";
+const std::string ConsumerRunningInfo::PROP_CONSUMER_START_TIMESTAMP = 
"PROP_CONSUMER_START_TIMESTAMP";
+
+
+ConsumerRunningInfo::ConsumerRunningInfo()
+{
+}
+
+ConsumerRunningInfo::~ConsumerRunningInfo()
+{
+}
+
+void ConsumerRunningInfo::encode(std::string& outData)
+{
+    std::stringstream ss;
+    ss  << "{"
+        << "}";
+    outData = ss.str();
+}
+
+
+std::string ConsumerRunningInfo::formatString()
+{
+       std::string sb = "rocketmq-client4cpp not suppport this feature";
+
+       /*
+       // 1
+       {
+               sb.append("#Consumer Properties#\n");
+               Iterator<Entry<Object, Object>> it = 
m_properties.entrySet().iterator();
+               while (it.hasNext()) {
+                       Entry<Object, Object> next = it.next();
+                       String item =
+                                       String.format("%-40s: %s\n", 
next.getKey().toString(), next.getValue().toString());
+                       sb.append(item);
+               }
+       }
+
+       // 2
+       {
+               sb.append("\n\n#Consumer Subscription#\n");
+
+               Iterator<SubscriptionData> it = m_subscriptionSet.iterator();
+               int i = 0;
+               while (it.hasNext()) {
+                       SubscriptionData next = it.next();
+                       String item = String.format("%03d Topic: %-40s 
ClassFilter: %-8s SubExpression: %s\n", //
+                               ++i,//
+                               next.getTopic(),//
+                               next.isClassFilterMode(),//
+                               next.getSubString());
+
+                       sb.append(item);
+               }
+       }
+
+       // 3
+       {
+               sb.append("\n\n#Consumer Offset#\n");
+               sb.append(String.format("%-32s  %-32s  %-4s  %-20s\n",//
+                       "#Topic",//
+                       "#Broker Name",//
+                       "#QID",//
+                       "#Consumer Offset"//
+               ));
+
+               Iterator<Entry<MessageQueue, ProcessQueueInfo>> it = 
m_mqTable.entrySet().iterator();
+               while (it.hasNext()) {
+                       Entry<MessageQueue, ProcessQueueInfo> next = it.next();
+                       String item = String.format("%-32s  %-32s  %-4d  
%-20d\n",//
+                               next.getKey().getTopic(),//
+                               next.getKey().getBrokerName(),//
+                               next.getKey().getQueueId(),//
+                               next.getValue().getCommitOffset());
+
+                       sb.append(item);
+               }
+       }
+
+       // 4
+       {
+               sb.append("\n\n#Consumer MQ Detail#\n");
+               sb.append(String.format("%-32s  %-32s  %-4s  %-20s\n",//
+                       "#Topic",//
+                       "#Broker Name",//
+                       "#QID",//
+                       "#ProcessQueueInfo"//
+               ));
+
+               Iterator<Entry<MessageQueue, ProcessQueueInfo>> it = 
m_mqTable.entrySet().iterator();
+               while (it.hasNext()) {
+                       Entry<MessageQueue, ProcessQueueInfo> next = it.next();
+                       String item = String.format("%-32s  %-32s  %-4d  
%s\n",//
+                               next.getKey().getTopic(),//
+                               next.getKey().getBrokerName(),//
+                               next.getKey().getQueueId(),//
+                               next.getValue().toString());
+
+                       sb.append(item);
+               }
+       }
+
+       // 5
+       {
+               sb.append("\n\n#Consumer RT&TPS#\n");
+               sb.append(String.format("%-32s  %14s %14s %14s %14s %18s 
%25s\n",//
+                       "#Topic",//
+                       "#Pull RT",//
+                       "#Pull TPS",//
+                       "#Consume RT",//
+                       "#ConsumeOK TPS",//
+                       "#ConsumeFailed TPS",//
+                       "#ConsumeFailedMsgsInHour"//
+               ));
+
+               Iterator<Entry<String, ConsumeStatus>> it = 
m_statusTable.entrySet().iterator();
+               while (it.hasNext()) {
+                       Entry<String, ConsumeStatus> next = it.next();
+                       String item = String.format("%-32s  %14.2f %14.2f 
%14.2f %14.2f %18.2f %25d\n",//
+                               next.getKey(),//
+                               next.getValue().getPullRT(),//
+                               next.getValue().getPullTPS(),//
+                               next.getValue().getConsumeRT(),//
+                               next.getValue().getConsumeOKTPS(),//
+                               next.getValue().getConsumeFailedTPS(),//
+                               next.getValue().getConsumeFailedMsgs()//
+                               );
+
+                       sb.append(item);
+               }
+       }
+
+       // 6
+       if (m_jstack != null) {
+               sb.append("\n\n#Consumer jstack#\n");
+               sb.append(m_jstack);
+       }
+       */
+
+       return sb;
+}
+
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/6a45c767/rocketmq-client4cpp/src/protocol/ConsumerRunningInfo.h
----------------------------------------------------------------------
diff --git a/rocketmq-client4cpp/src/protocol/ConsumerRunningInfo.h 
b/rocketmq-client4cpp/src/protocol/ConsumerRunningInfo.h
new file mode 100755
index 0000000..588bf07
--- /dev/null
+++ b/rocketmq-client4cpp/src/protocol/ConsumerRunningInfo.h
@@ -0,0 +1,97 @@
+/**
+* Copyright (C) 2013 kangliqiang ,[email protected]
+*
+* Licensed under the Apache License, Version 2.0 (the "License");
+* you may not use this file except in compliance with the License.
+* You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+#ifndef __ConsumerRunningInfo_H__
+#define __ConsumerRunningInfo_H__
+
+#include <string>
+#include <set>
+#include <map>
+
+#include "RemotingSerializable.h"
+#include "MessageQueue.h"
+#include "SubscriptionData.h"
+#include "ConsumerStatManage.h"
+
+namespace rmq
+{
+    class ConsumerRunningInfo : public RemotingSerializable
+    {
+    public:
+        ConsumerRunningInfo();
+        ~ConsumerRunningInfo();
+
+               /*
+               std::map<std::string, std::string>& getProperties()
+               {
+                       return m_properties;
+               }
+        void setProperties(const std::map<std::string, std::string>& 
properties)
+               {
+                       m_properties = properties;
+               }
+
+        std::map<MessageQueue, ProcessQueueInfo>& getMqTable()
+               {
+                       return m_mqTable;
+               }
+        void setMqTable(const std::map<MessageQueue, ProcessQueueInfo>& 
mqTable)
+               {
+                       m_mqTable = mqTable;
+               }
+
+        std::map<std::string, ConsumeStatus>& getStatusTable()
+               {
+                       return m_statusTable;
+               }
+        void setStatusTable(const std::map<std::string, ConsumeStatus>& 
statusTable)
+               {
+                   m_statusTable = statusTable;
+               }
+
+        std::set<SubscriptionData>& getSubscriptionSet()
+               {
+                       return m_subscriptionSet;
+               }
+        void setSubscriptionSet(const std::set<SubscriptionData>& 
subscriptionSet)
+               {
+                       m_subscriptionSet = subscriptionSet;
+               }
+               */
+
+               void encode(std::string& outData);
+               std::string formatString();
+
+       public:
+               static const std::string PROP_NAMESERVER_ADDR;
+               static const std::string PROP_THREADPOOL_CORE_SIZE;
+               static const std::string PROP_CONSUME_ORDERLY;
+               static const std::string PROP_CONSUME_TYPE;
+               static const std::string PROP_CLIENT_VERSION;
+               static const std::string PROP_CONSUMER_START_TIMESTAMP;
+
+    private:
+               /*
+               std::map<std::string, std::string> m_properties;
+               std::set<SubscriptionData> m_subscriptionSet;
+               std::map<MessageQueue, ProcessQueueInfo> m_mqTable;
+               std::map<string, ConsumerStat> m_statusTable;
+               std::string m_jstack;
+               */
+    };
+}
+
+#endif

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/6a45c767/rocketmq-client4cpp/src/protocol/GetConsumerListByGroupResponseBody.h
----------------------------------------------------------------------
diff --git 
a/rocketmq-client4cpp/src/protocol/GetConsumerListByGroupResponseBody.h 
b/rocketmq-client4cpp/src/protocol/GetConsumerListByGroupResponseBody.h
new file mode 100755
index 0000000..0ea19da
--- /dev/null
+++ b/rocketmq-client4cpp/src/protocol/GetConsumerListByGroupResponseBody.h
@@ -0,0 +1,97 @@
+/**
+* Copyright (C) 2013 kangliqiang ,[email protected]
+*
+* Licensed under the Apache License, Version 2.0 (the "License");
+* you may not use this file except in compliance with the License.
+* You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+#ifndef __GETCONSUMERLISTBYGROUPRESPONSEBODY_H__
+#define __GETCONSUMERLISTBYGROUPRESPONSEBODY_H__
+
+#include <string>
+#include <sstream>
+#include <list>
+#include "UtilAll.h"
+#include "RemotingSerializable.h"
+
+namespace rmq
+{
+    class GetConsumerListByGroupResponseBody : public RemotingSerializable
+    {
+    public:
+        GetConsumerListByGroupResponseBody()
+        {
+
+        }
+
+        ~GetConsumerListByGroupResponseBody()
+        {
+
+        }
+
+        void encode(std::string& outData)
+        {
+
+        }
+
+        static GetConsumerListByGroupResponseBody* decode(const char* pData, 
int len)
+        {
+            /*
+            {"consumerIdList":["10.12.22.213@DEFAULT", "10.12.22.213@xxx"]}
+            */
+            //RMQ_DEBUG("GET_CONSUMER_LIST_BY_GROUP_VALUE:%s", pData);
+
+            Json::Reader reader;
+            Json::Value object;
+            if (!reader.parse(pData, pData + len, object))
+            {
+                RMQ_ERROR("parse fail: %s", 
reader.getFormattedErrorMessages().c_str());
+                return NULL;
+            }
+
+            GetConsumerListByGroupResponseBody* rsp =  new 
GetConsumerListByGroupResponseBody();
+            Json::Value cidList = object["consumerIdList"];
+            for (size_t i = 0; i < cidList.size(); i++)
+            {
+                Json::Value cid = cidList[i];
+                if (cid != Json::Value::null)
+                {
+                    rsp->m_consumerIdList.push_back(cid.asString());
+                }
+            }
+
+            return rsp;
+        }
+
+        std::list<std::string>& getConsumerIdList()
+        {
+            return m_consumerIdList;
+        }
+
+        void setConsumerIdList(const std::list<std::string>& consumerIdList)
+        {
+            m_consumerIdList = consumerIdList;
+        }
+
+        std::string toString() const
+        {
+            std::stringstream ss;
+            ss << "{consumerIdList=" << UtilAll::toString(m_consumerIdList) << 
"}";
+            return ss.str();
+        }
+
+    private:
+        std::list<std::string> m_consumerIdList;
+    };
+}
+
+#endif

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/6a45c767/rocketmq-client4cpp/src/protocol/HeartbeatData.cpp
----------------------------------------------------------------------
diff --git a/rocketmq-client4cpp/src/protocol/HeartbeatData.cpp 
b/rocketmq-client4cpp/src/protocol/HeartbeatData.cpp
new file mode 100755
index 0000000..73f197a
--- /dev/null
+++ b/rocketmq-client4cpp/src/protocol/HeartbeatData.cpp
@@ -0,0 +1,52 @@
+/**
+* Copyright (C) 2013 kangliqiang ,[email protected]
+*
+* Licensed under the Apache License, Version 2.0 (the "License");
+* you may not use this file except in compliance with the License.
+* You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+#include "HeartbeatData.h"
+
+namespace rmq
+{
+
+void HeartbeatData::encode(std::string& outData)
+{
+    
//{"clientID":"10.6.223.90@16164","consumerDataSet":[{"consumeFromWhere":"CONSUME_FROM_LAST_OFFSET","consumeType":"CONSUME_ACTIVELY","groupName":"please_rename_unique_group_name_5","messageModel":"CLUSTERING","subscriptionDataSet":[],"unitMode":false}],"producerDataSet":[{"groupName":"CLIENT_INNER_PRODUCER"}]}
+    Json::Value obj;
+    obj["clientID"] = m_clientID;
+
+    Json::Value consumerDataSet(Json::arrayValue);
+    for (typeof(m_consumerDataSet.begin()) it = m_consumerDataSet.begin(); it 
!= m_consumerDataSet.end(); it++)
+    {
+        Json::Value o;
+        (*it).toJson(o);
+        consumerDataSet.append(o);
+    }
+    obj["consumerDataSet"] = consumerDataSet;
+
+    Json::Value producerDataSet(Json::arrayValue);
+    for (typeof(m_producerDataSet.begin()) it = m_producerDataSet.begin(); it 
!= m_producerDataSet.end(); it++)
+    {
+        Json::Value o;
+        it->toJson(o);
+        producerDataSet.append(o);
+    }
+    obj["producerDataSet"] = producerDataSet;
+
+    Json::FastWriter outer;
+    outData = outer.write(obj);
+}
+
+
+
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/6a45c767/rocketmq-client4cpp/src/protocol/HeartbeatData.h
----------------------------------------------------------------------
diff --git a/rocketmq-client4cpp/src/protocol/HeartbeatData.h 
b/rocketmq-client4cpp/src/protocol/HeartbeatData.h
new file mode 100755
index 0000000..cb0f720
--- /dev/null
+++ b/rocketmq-client4cpp/src/protocol/HeartbeatData.h
@@ -0,0 +1,157 @@
+/**
+* Copyright (C) 2013 kangliqiang ,[email protected]
+*
+* Licensed under the Apache License, Version 2.0 (the "License");
+* you may not use this file except in compliance with the License.
+* You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+#ifndef __HEARTBEATDATA_H__
+#define __HEARTBEATDATA_H__
+
+#include <string>
+#include <set>
+#include <sstream>
+
+#include "RocketMQClient.h"
+#include "ConsumeType.h"
+#include "SubscriptionData.h"
+#include "RemotingSerializable.h"
+#include "UtilAll.h"
+
+namespace rmq
+{
+    struct ConsumerData
+    {
+        std::string groupName;
+        ConsumeType consumeType;
+        MessageModel messageModel;
+        ConsumeFromWhere consumeFromWhere;
+        std::set<SubscriptionData> subscriptionDataSet;
+        bool operator < (const ConsumerData& cd)const
+        {
+            return groupName < cd.groupName;
+        }
+
+        void toJson(Json::Value& obj) const
+        {
+            
//{"consumeFromWhere":"CONSUME_FROM_LAST_OFFSET","consumeType":"CONSUME_ACTIVELY","groupName":"please_rename_unique_group_name_5","messageModel":"CLUSTERING","subscriptionDataSet":[],"unitMode":false}
+            obj["groupName"] = groupName;
+            obj["messageModel"] = getMessageModelString(messageModel);
+            obj["consumeFromWhere"] = 
getConsumeFromWhereString(consumeFromWhere);
+            obj["consumeType"] = getConsumeTypeString(consumeType);
+            obj["unitMode"] = false;
+
+            Json::Value objSub(Json::arrayValue);
+            RMQ_FOR_EACH(subscriptionDataSet, it)
+            {
+                Json::Value o;
+                (*it).toJson(o);
+                objSub.append(o);
+            }
+            obj["subscriptionDataSet"] = objSub;
+        }
+
+               std::string toString() const
+        {
+            std::stringstream ss;
+            ss << "{groupName=" << groupName
+               << ",messageModel=" << getMessageModelString(messageModel)
+               << ",consumeFromWhere=" << 
getConsumeFromWhereString(consumeFromWhere)
+               << ",consumeType=" << getConsumeTypeString(consumeType)
+               << ",subscriptionDataSet=" << 
UtilAll::toString(subscriptionDataSet)
+               << "}";
+            return ss.str();
+        }
+    };
+       inline std::ostream& operator<<(std::ostream& os, const ConsumerData& 
obj)
+       {
+           os << obj.toString();
+           return os;
+       }
+
+    struct ProducerData
+    {
+        std::string groupName;
+        bool operator < (const ProducerData& pd)const
+        {
+            return groupName < pd.groupName;
+        }
+        void toJson(Json::Value& obj) const
+        {
+            obj["groupName"] = groupName;
+        }
+
+               std::string toString() const
+        {
+            std::stringstream ss;
+            ss << "{groupName=" << groupName << "}";
+            return ss.str();
+        }
+    };
+       inline std::ostream& operator<<(std::ostream& os, const ProducerData& 
obj)
+       {
+           os << obj.toString();
+           return os;
+       }
+
+
+    class HeartbeatData : public RemotingSerializable
+    {
+    public:
+        void encode(std::string& outData);
+
+        std::string getClientID()
+        {
+            return m_clientID;
+        }
+
+        void setClientID(const std::string& clientID)
+        {
+            m_clientID = clientID;
+        }
+
+        std::set<ProducerData>& getProducerDataSet()
+        {
+            return m_producerDataSet;
+        }
+
+        void setProducerDataSet(const std::set<ProducerData>& producerDataSet)
+        {
+            m_producerDataSet = producerDataSet;
+        }
+
+        std::set<ConsumerData>& getConsumerDataSet()
+        {
+            return m_consumerDataSet;
+        }
+
+        void setConsumerDataSet(const std::set<ConsumerData>& consumerDataSet)
+        {
+            m_consumerDataSet = consumerDataSet;
+        }
+
+               std::string toString() const
+               {
+                   std::stringstream ss;
+                   ss << "{clientID=" << m_clientID
+                      << ",producerDataSet=" << 
UtilAll::toString(m_producerDataSet)
+                      << ",consumerDataSet=" << 
UtilAll::toString(m_consumerDataSet) << "}";
+                   return ss.str();
+               }
+
+    private:
+        std::string m_clientID;
+        std::set<ProducerData> m_producerDataSet;
+        std::set<ConsumerData> m_consumerDataSet;
+    };
+}
+
+#endif

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/6a45c767/rocketmq-client4cpp/src/protocol/KVTable.h
----------------------------------------------------------------------
diff --git a/rocketmq-client4cpp/src/protocol/KVTable.h 
b/rocketmq-client4cpp/src/protocol/KVTable.h
new file mode 100755
index 0000000..726b872
--- /dev/null
+++ b/rocketmq-client4cpp/src/protocol/KVTable.h
@@ -0,0 +1,58 @@
+/**
+* Copyright (C) 2013 kangliqiang ,[email protected]
+*
+* Licensed under the Apache License, Version 2.0 (the "License");
+* you may not use this file except in compliance with the License.
+* You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+#ifndef __KVTABLE_H__
+#define __KVTABLE_H__
+
+#include <map>
+#include <string>
+#include "RemotingSerializable.h"
+#include "UtilAll.h"
+
+namespace rmq
+{
+    class KVTable : public RemotingSerializable
+    {
+    public:
+        void encode(std::string& outData)
+        {
+
+        }
+
+               std::string toString() const
+               {
+                       std::stringstream ss;
+                       ss << "{table=" << UtilAll::toString(m_table)
+                          << "}";
+                       return ss.str();
+               }
+
+        const std::map<std::string, std::string>& getTable()
+        {
+            return m_table;
+        }
+
+        void setTable(const std::map<std::string, std::string>& table)
+        {
+            m_table = table;
+        }
+
+    private:
+        std::map<std::string, std::string> m_table ;
+    };
+}
+
+#endif

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/6a45c767/rocketmq-client4cpp/src/protocol/LockBatchBody.cpp
----------------------------------------------------------------------
diff --git a/rocketmq-client4cpp/src/protocol/LockBatchBody.cpp 
b/rocketmq-client4cpp/src/protocol/LockBatchBody.cpp
new file mode 100755
index 0000000..947abe2
--- /dev/null
+++ b/rocketmq-client4cpp/src/protocol/LockBatchBody.cpp
@@ -0,0 +1,112 @@
+/**
+* Copyright (C) 2013 kangliqiang ,[email protected]
+*
+* Licensed under the Apache License, Version 2.0 (the "License");
+* you may not use this file except in compliance with the License.
+* You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+#include "LockBatchBody.h"
+#include "UtilAll.h"
+
+namespace rmq
+{
+
+LockBatchRequestBody::LockBatchRequestBody()
+{
+}
+
+LockBatchRequestBody::~LockBatchRequestBody()
+{
+}
+
+void LockBatchRequestBody::encode(std::string& outData)
+{
+
+}
+
+std::string LockBatchRequestBody::toString() const
+{
+       std::stringstream ss;
+       ss << "{consumerGroup=" << m_consumerGroup
+          << ",clientId=" << m_clientId
+          << ",mqSet=" << UtilAll::toString(m_mqSet)
+          << "}";
+       return ss.str();
+}
+
+
+std::string LockBatchRequestBody::getConsumerGroup()
+{
+    return m_consumerGroup;
+}
+
+void LockBatchRequestBody::setConsumerGroup(const std::string& consumerGroup)
+{
+    m_consumerGroup = consumerGroup;
+}
+
+std::string LockBatchRequestBody::getClientId()
+{
+    return m_clientId;
+}
+
+void LockBatchRequestBody::setClientId(const std::string& clientId)
+{
+    m_clientId = clientId;
+}
+
+std::set<MessageQueue>& LockBatchRequestBody::getMqSet()
+{
+    return m_mqSet;
+}
+
+void LockBatchRequestBody::setMqSet(const std::set<MessageQueue>& mqSet)
+{
+    m_mqSet = mqSet;
+}
+
+LockBatchResponseBody::LockBatchResponseBody()
+{
+}
+
+LockBatchResponseBody::~LockBatchResponseBody()
+{
+}
+
+void LockBatchResponseBody::encode(std::string& outData)
+{
+}
+
+std::string LockBatchResponseBody::toString() const
+{
+       std::stringstream ss;
+       ss << "{consumerGroup=" << UtilAll::toString(m_lockOKMQSet)
+          << "}";
+       return ss.str();
+}
+
+
+LockBatchResponseBody* LockBatchResponseBody::decode(const char* pData, int 
len)
+{
+    return new LockBatchResponseBody();
+}
+
+std::set<MessageQueue> LockBatchResponseBody::getLockOKMQSet()
+{
+    return m_lockOKMQSet;
+}
+
+void LockBatchResponseBody::setLockOKMQSet(const std::set<MessageQueue>& 
lockOKMQSet)
+{
+    m_lockOKMQSet = lockOKMQSet;
+}
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/6a45c767/rocketmq-client4cpp/src/protocol/LockBatchBody.h
----------------------------------------------------------------------
diff --git a/rocketmq-client4cpp/src/protocol/LockBatchBody.h 
b/rocketmq-client4cpp/src/protocol/LockBatchBody.h
new file mode 100755
index 0000000..ab9ee02
--- /dev/null
+++ b/rocketmq-client4cpp/src/protocol/LockBatchBody.h
@@ -0,0 +1,73 @@
+/**
+* Copyright (C) 2013 kangliqiang ,[email protected]
+*
+* Licensed under the Apache License, Version 2.0 (the "License");
+* you may not use this file except in compliance with the License.
+* You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+#ifndef __LOCKBATCHBODY_H__
+#define __LOCKBATCHBODY_H__
+
+#include <string>
+#include <set>
+
+#include "RemotingSerializable.h"
+#include "MessageQueue.h"
+
+namespace rmq
+{
+    class LockBatchRequestBody : public RemotingSerializable
+    {
+    public:
+        LockBatchRequestBody();
+        ~LockBatchRequestBody();
+
+        void encode(std::string& outData);
+               std::string toString() const;
+
+        std::string getConsumerGroup();
+        void setConsumerGroup(const std::string& consumerGroup);
+
+        std::string getClientId();
+        void setClientId(const std::string& clientId);
+
+        std::set<MessageQueue>& getMqSet();
+        void setMqSet(const std::set<MessageQueue>& mqSet);
+
+    private:
+        std::string m_consumerGroup;
+        std::string m_clientId;
+        std::set<MessageQueue> m_mqSet;
+    };
+
+    class LockBatchResponseBody : public RemotingSerializable
+    {
+    public:
+        LockBatchResponseBody();
+        ~LockBatchResponseBody();
+
+        void encode(std::string& outData);
+               std::string toString() const;
+
+        static LockBatchResponseBody* decode(const char* pData, int len);
+
+        std::set<MessageQueue> getLockOKMQSet();
+        void setLockOKMQSet(const std::set<MessageQueue>& lockOKMQSet);
+
+    private:
+        std::set<MessageQueue> m_lockOKMQSet;
+    };
+
+    typedef LockBatchRequestBody UnlockBatchRequestBody;
+}
+
+#endif

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/6a45c767/rocketmq-client4cpp/src/protocol/MQProtos.cpp
----------------------------------------------------------------------
diff --git a/rocketmq-client4cpp/src/protocol/MQProtos.cpp 
b/rocketmq-client4cpp/src/protocol/MQProtos.cpp
new file mode 100755
index 0000000..052c104
--- /dev/null
+++ b/rocketmq-client4cpp/src/protocol/MQProtos.cpp
@@ -0,0 +1,248 @@
+/**
+* Copyright (C) 2013 kangliqiang ,[email protected]
+*
+* Licensed under the Apache License, Version 2.0 (the "License");
+* you may not use this file except in compliance with the License.
+* You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+#include "MQProtos.h"
+
+namespace rmq
+{
+
+const char* getMQRequestCodeString(int code)
+{
+    switch (code)
+    {
+               case SEND_MESSAGE_VALUE:
+                       return "SEND_MESSAGE_VALUE";
+               case PULL_MESSAGE_VALUE:
+                       return "PULL_MESSAGE_VALUE";
+               case QUERY_MESSAGE_VALUE:
+                       return "QUERY_MESSAGE_VALUE";
+               case QUERY_BROKER_OFFSET_VALUE:
+                       return "QUERY_BROKER_OFFSET_VALUE";
+               case QUERY_CONSUMER_OFFSET_VALUE:
+                       return "QUERY_CONSUMER_OFFSET_VALUE";
+               case UPDATE_CONSUMER_OFFSET_VALUE:
+                       return "UPDATE_CONSUMER_OFFSET_VALUE";
+               case UPDATE_AND_CREATE_TOPIC_VALUE:
+                       return "UPDATE_AND_CREATE_TOPIC_VALUE";
+               case GET_ALL_TOPIC_CONFIG_VALUE:
+                       return "GET_ALL_TOPIC_CONFIG_VALUE";
+               case GET_TOPIC_CONFIG_LIST_VALUE:
+                       return "GET_TOPIC_CONFIG_LIST_VALUE";
+               case GET_TOPIC_NAME_LIST_VALUE:
+                       return "GET_TOPIC_NAME_LIST_VALUE";
+               case UPDATE_BROKER_CONFIG_VALUE:
+                       return "UPDATE_BROKER_CONFIG_VALUE";
+               case GET_BROKER_CONFIG_VALUE:
+                       return "GET_BROKER_CONFIG_VALUE";
+               case TRIGGER_DELETE_FILES_VALUE:
+                       return "TRIGGER_DELETE_FILES_VALUE";
+               case GET_BROKER_RUNTIME_INFO_VALUE:
+                       return "GET_BROKER_RUNTIME_INFO_VALUE";
+               case SEARCH_OFFSET_BY_TIMESTAMP_VALUE:
+                       return "SEARCH_OFFSET_BY_TIMESTAMP_VALUE";
+               case GET_MAX_OFFSET_VALUE:
+                       return "GET_MAX_OFFSET_VALUE";
+               case GET_MIN_OFFSET_VALUE:
+                       return "GET_MIN_OFFSET_VALUE";
+               case GET_EARLIEST_MSG_STORETIME_VALUE:
+                       return "GET_EARLIEST_MSG_STORETIME_VALUE";
+               case VIEW_MESSAGE_BY_ID_VALUE:
+                       return "VIEW_MESSAGE_BY_ID_VALUE";
+               case HEART_BEAT_VALUE:
+                       return "HEART_BEAT_VALUE";
+               case UNREGISTER_CLIENT_VALUE:
+                       return "UNREGISTER_CLIENT_VALUE";
+               case CONSUMER_SEND_MSG_BACK_VALUE:
+                       return "CONSUMER_SEND_MSG_BACK_VALUE";
+               case END_TRANSACTION_VALUE:
+                       return "END_TRANSACTION_VALUE";
+               case GET_CONSUMER_LIST_BY_GROUP_VALUE:
+                       return "GET_CONSUMER_LIST_BY_GROUP_VALUE";
+               case CHECK_TRANSACTION_STATE_VALUE:
+                       return "CHECK_TRANSACTION_STATE_VALUE";
+               case NOTIFY_CONSUMER_IDS_CHANGED_VALUE:
+                       return "NOTIFY_CONSUMER_IDS_CHANGED_VALUE";
+               case LOCK_BATCH_MQ_VALUE:
+                       return "LOCK_BATCH_MQ_VALUE";
+               case UNLOCK_BATCH_MQ_VALUE:
+                       return "UNLOCK_BATCH_MQ_VALUE";
+               case GET_ALL_CONSUMER_OFFSET_VALUE:
+                       return "GET_ALL_CONSUMER_OFFSET_VALUE";
+               case GET_ALL_DELAY_OFFSET_VALUE:
+                       return "GET_ALL_DELAY_OFFSET_VALUE";
+               case PUT_KV_CONFIG_VALUE:
+                       return "PUT_KV_CONFIG_VALUE";
+               case GET_KV_CONFIG_VALUE:
+                       return "GET_KV_CONFIG_VALUE";
+               case DELETE_KV_CONFIG_VALUE:
+                       return "DELETE_KV_CONFIG_VALUE";
+               case REGISTER_BROKER_VALUE:
+                       return "REGISTER_BROKER_VALUE";
+               case UNREGISTER_BROKER_VALUE:
+                       return "UNREGISTER_BROKER_VALUE";
+               case GET_ROUTEINTO_BY_TOPIC_VALUE:
+                       return "GET_ROUTEINTO_BY_TOPIC_VALUE";
+               case GET_BROKER_CLUSTER_INFO_VALUE:
+                       return "GET_BROKER_CLUSTER_INFO_VALUE";
+               case UPDATE_AND_CREATE_SUBSCRIPTIONGROUP_VALUE:
+                       return "UPDATE_AND_CREATE_SUBSCRIPTIONGROUP_VALUE";
+               case GET_ALL_SUBSCRIPTIONGROUP_CONFIG_VALUE:
+                       return "GET_ALL_SUBSCRIPTIONGROUP_CONFIG_VALUE";
+               case GET_TOPIC_STATS_INFO_VALUE:
+                       return "GET_TOPIC_STATS_INFO_VALUE";
+               case GET_CONSUMER_CONNECTION_LIST_VALUE:
+                       return "GET_CONSUMER_CONNECTION_LIST_VALUE";
+               case GET_PRODUCER_CONNECTION_LIST_VALUE:
+                       return "GET_PRODUCER_CONNECTION_LIST_VALUE";
+               case WIPE_WRITE_PERM_OF_BROKER_VALUE:
+                       return "WIPE_WRITE_PERM_OF_BROKER_VALUE";
+               case GET_ALL_TOPIC_LIST_FROM_NAMESERVER_VALUE:
+                       return "GET_ALL_TOPIC_LIST_FROM_NAMESERVER_VALUE";
+               case DELETE_SUBSCRIPTIONGROUP_VALUE:
+                       return "DELETE_SUBSCRIPTIONGROUP_VALUE";
+               case GET_CONSUME_STATS_VALUE:
+                       return "GET_CONSUME_STATS_VALUE";
+               case SUSPEND_CONSUMER_VALUE:
+                       return "SUSPEND_CONSUMER_VALUE";
+               case RESUME_CONSUMER_VALUE:
+                       return "RESUME_CONSUMER_VALUE";
+               case RESET_CONSUMER_OFFSET_IN_CONSUMER_VALUE:
+                       return "RESET_CONSUMER_OFFSET_IN_CONSUMER_VALUE";
+               case RESET_CONSUMER_OFFSET_IN_BROKER_VALUE:
+                       return "RESET_CONSUMER_OFFSET_IN_BROKER_VALUE";
+               case ADJUST_CONSUMER_THREAD_POOL_VALUE:
+                       return "ADJUST_CONSUMER_THREAD_POOL_VALUE";
+               case WHO_CONSUME_THE_MESSAGE_VALUE:
+                       return "WHO_CONSUME_THE_MESSAGE_VALUE";
+               case DELETE_TOPIC_IN_BROKER_VALUE:
+                       return "DELETE_TOPIC_IN_BROKER_VALUE";
+               case DELETE_TOPIC_IN_NAMESRV_VALUE:
+                       return "DELETE_TOPIC_IN_NAMESRV_VALUE";
+               case GET_KV_CONFIG_BY_VALUE_VALUE:
+                       return "GET_KV_CONFIG_BY_VALUE_VALUE";
+               case DELETE_KV_CONFIG_BY_VALUE_VALUE:
+                       return "DELETE_KV_CONFIG_BY_VALUE_VALUE";
+               case GET_KVLIST_BY_NAMESPACE_VALUE:
+                       return "GET_KVLIST_BY_NAMESPACE_VALUE";
+               case RESET_CONSUMER_CLIENT_OFFSET_VALUE:
+                       return "RESET_CONSUMER_CLIENT_OFFSET_VALUE";
+               case GET_CONSUMER_STATUS_FROM_CLIENT_VALUE:
+                       return "GET_CONSUMER_STATUS_FROM_CLIENT_VALUE";
+               case INVOKE_BROKER_TO_RESET_OFFSET_VALUE:
+                       return "INVOKE_BROKER_TO_RESET_OFFSET_VALUE";
+               case INVOKE_BROKER_TO_GET_CONSUMER_STATUS_VALUE:
+                       return "INVOKE_BROKER_TO_GET_CONSUMER_STATUS_VALUE";
+               case QUERY_TOPIC_CONSUME_BY_WHO_VALUE:
+                       return "QUERY_TOPIC_CONSUME_BY_WHO_VALUE";
+               case GET_TOPICS_BY_CLUSTER_VALUE:
+                       return "GET_TOPICS_BY_CLUSTER_VALUE";
+               case REGISTER_FILTER_SERVER_VALUE:
+                       return "REGISTER_FILTER_SERVER_VALUE";
+               case REGISTER_MESSAGE_FILTER_CLASS_VALUE:
+                       return "REGISTER_MESSAGE_FILTER_CLASS_VALUE";
+               case QUERY_CONSUME_TIME_SPAN_VALUE:
+                       return "QUERY_CONSUME_TIME_SPAN_VALUE";
+               case GET_SYSTEM_TOPIC_LIST_FROM_NS_VALUE:
+                       return "GET_SYSTEM_TOPIC_LIST_FROM_NS_VALUE";
+               case GET_SYSTEM_TOPIC_LIST_FROM_BROKER_VALUE:
+                       return "GET_SYSTEM_TOPIC_LIST_FROM_BROKER_VALUE";
+               case CLEAN_EXPIRED_CONSUMEQUEUE_VALUE:
+                       return "CLEAN_EXPIRED_CONSUMEQUEUE_VALUE";
+               case GET_CONSUMER_RUNNING_INFO_VALUE:
+                       return "GET_CONSUMER_RUNNING_INFO_VALUE";
+               case QUERY_CORRECTION_OFFSET_VALUE:
+                       return "QUERY_CORRECTION_OFFSET_VALUE";
+               case CONSUME_MESSAGE_DIRECTLY_VALUE:
+                       return "CONSUME_MESSAGE_DIRECTLY_VALUE";
+               case SEND_MESSAGE_V2_VALUE:
+                       return "SEND_MESSAGE_V2_VALUE";
+               case GET_UNIT_TOPIC_LIST_VALUE:
+                       return "GET_UNIT_TOPIC_LIST_VALUE";
+               case GET_HAS_UNIT_SUB_TOPIC_LIST_VALUE:
+                       return "GET_HAS_UNIT_SUB_TOPIC_LIST_VALUE";
+               case GET_HAS_UNIT_SUB_UNUNIT_TOPIC_LIST_VALUE:
+                       return "GET_HAS_UNIT_SUB_UNUNIT_TOPIC_LIST_VALUE";
+               case CLONE_GROUP_OFFSET_VALUE:
+                       return "CLONE_GROUP_OFFSET_VALUE";
+               case VIEW_BROKER_STATS_DATA_VALUE:
+                       return "VIEW_BROKER_STATS_DATA_VALUE";
+    }
+
+    return "UnknowMQRequestCode";
+}
+
+const char* getMQResponseCodeString(int code)
+{
+    switch (code)
+    {
+               case 0:
+                       return "OK";
+               case FLUSH_DISK_TIMEOUT_VALUE:
+                       return "FLUSH_DISK_TIMEOUT_VALUE";
+               case SLAVE_NOT_AVAILABLE_VALUE:
+                       return "SLAVE_NOT_AVAILABLE_VALUE";
+               case FLUSH_SLAVE_TIMEOUT_VALUE:
+                       return "FLUSH_SLAVE_TIMEOUT_VALUE";
+               case MESSAGE_ILLEGAL_VALUE:
+                       return "MESSAGE_ILLEGAL_VALUE";
+               case SERVICE_NOT_AVAILABLE_VALUE:
+                       return "SERVICE_NOT_AVAILABLE_VALUE";
+               case VERSION_NOT_SUPPORTED_VALUE:
+                       return "VERSION_NOT_SUPPORTED_VALUE";
+               case NO_PERMISSION_VALUE:
+                       return "NO_PERMISSION_VALUE";
+               case TOPIC_NOT_EXIST_VALUE:
+                       return "TOPIC_NOT_EXIST_VALUE";
+               case TOPIC_EXIST_ALREADY_VALUE:
+                       return "TOPIC_EXIST_ALREADY_VALUE";
+               case PULL_NOT_FOUND_VALUE:
+                       return "PULL_NOT_FOUND_VALUE";
+               case PULL_RETRY_IMMEDIATELY_VALUE:
+                       return "PULL_RETRY_IMMEDIATELY_VALUE";
+               case PULL_OFFSET_MOVED_VALUE:
+                       return "PULL_OFFSET_MOVED_VALUE";
+               case QUERY_NOT_FOUND_VALUE:
+                       return "QUERY_NOT_FOUND_VALUE";
+               case SUBSCRIPTION_PARSE_FAILED_VALUE:
+                       return "SUBSCRIPTION_PARSE_FAILED_VALUE";
+               case SUBSCRIPTION_NOT_EXIST_VALUE:
+                       return "SUBSCRIPTION_NOT_EXIST_VALUE";
+               case SUBSCRIPTION_NOT_LATEST_VALUE:
+                       return "SUBSCRIPTION_NOT_LATEST_VALUE";
+               case SUBSCRIPTION_GROUP_NOT_EXIST_VALUE:
+                       return "SUBSCRIPTION_GROUP_NOT_EXIST_VALUE";
+               case TRANSACTION_SHOULD_COMMIT_VALUE:
+                       return "TRANSACTION_SHOULD_COMMIT_VALUE";
+               case TRANSACTION_SHOULD_ROLLBACK_VALUE:
+                       return "TRANSACTION_SHOULD_ROLLBACK_VALUE";
+               case TRANSACTION_STATE_UNKNOW_VALUE:
+                       return "TRANSACTION_STATE_UNKNOW_VALUE";
+               case TRANSACTION_STATE_GROUP_WRONG_VALUE:
+                       return "TRANSACTION_STATE_GROUP_WRONG_VALUE";
+               case NO_BUYER_ID_VALUE:
+                       return "NO_BUYER_ID_VALUE";
+               case NOT_IN_CURRENT_UNIT_VALUE:
+                       return "NOT_IN_CURRENT_UNIT_VALUE";
+               case CONSUMER_NOT_ONLINE_VALUE:
+                       return "CONSUMER_NOT_ONLINE_VALUE";
+               case CONSUME_MSG_TIMEOUT_VALUE:
+                       return "CONSUME_MSG_TIMEOUT_VALUE";
+    }
+
+    return "UnknowMQResponseCode";
+}
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/6a45c767/rocketmq-client4cpp/src/protocol/MQProtos.h
----------------------------------------------------------------------
diff --git a/rocketmq-client4cpp/src/protocol/MQProtos.h 
b/rocketmq-client4cpp/src/protocol/MQProtos.h
new file mode 100755
index 0000000..94167ea
--- /dev/null
+++ b/rocketmq-client4cpp/src/protocol/MQProtos.h
@@ -0,0 +1,150 @@
+/**
+* Copyright (C) 2013 kangliqiang ,[email protected]
+*
+* Licensed under the Apache License, Version 2.0 (the "License");
+* you may not use this file except in compliance with the License.
+* You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+#ifndef __MQPROTOS_H__
+#define __MQPROTOS_H__
+
+namespace rmq
+{
+    enum MQRequestCode
+    {
+        // broker
+        SEND_MESSAGE_VALUE = 10,
+        PULL_MESSAGE_VALUE = 11,
+        QUERY_MESSAGE_VALUE = 12,
+        QUERY_BROKER_OFFSET_VALUE = 13,
+        QUERY_CONSUMER_OFFSET_VALUE = 14,
+        UPDATE_CONSUMER_OFFSET_VALUE = 15,
+        UPDATE_AND_CREATE_TOPIC_VALUE = 17,
+
+        GET_ALL_TOPIC_CONFIG_VALUE = 21,
+        GET_TOPIC_CONFIG_LIST_VALUE = 22,
+        GET_TOPIC_NAME_LIST_VALUE = 23,
+        UPDATE_BROKER_CONFIG_VALUE = 25,
+        GET_BROKER_CONFIG_VALUE = 26,
+        TRIGGER_DELETE_FILES_VALUE = 27,
+        GET_BROKER_RUNTIME_INFO_VALUE = 28,
+        SEARCH_OFFSET_BY_TIMESTAMP_VALUE = 29,
+
+        GET_MAX_OFFSET_VALUE = 30,
+        GET_MIN_OFFSET_VALUE = 31,
+        GET_EARLIEST_MSG_STORETIME_VALUE = 32,
+        VIEW_MESSAGE_BY_ID_VALUE = 33,
+        HEART_BEAT_VALUE = 34,
+        UNREGISTER_CLIENT_VALUE = 35,
+        CONSUMER_SEND_MSG_BACK_VALUE = 36,
+        END_TRANSACTION_VALUE = 37,
+        GET_CONSUMER_LIST_BY_GROUP_VALUE = 38,
+        CHECK_TRANSACTION_STATE_VALUE = 39,
+
+        NOTIFY_CONSUMER_IDS_CHANGED_VALUE = 40,
+        LOCK_BATCH_MQ_VALUE = 41,
+        UNLOCK_BATCH_MQ_VALUE = 42,
+        GET_ALL_CONSUMER_OFFSET_VALUE = 43,
+        GET_ALL_DELAY_OFFSET_VALUE = 45,
+
+        // Namesrv
+        PUT_KV_CONFIG_VALUE = 100,
+        GET_KV_CONFIG_VALUE = 101,
+        DELETE_KV_CONFIG_VALUE = 102,
+        REGISTER_BROKER_VALUE = 103,
+        UNREGISTER_BROKER_VALUE = 104,
+        GET_ROUTEINTO_BY_TOPIC_VALUE = 105,
+        GET_BROKER_CLUSTER_INFO_VALUE = 106,
+
+        // broker && namesrv
+        UPDATE_AND_CREATE_SUBSCRIPTIONGROUP_VALUE = 200,
+        GET_ALL_SUBSCRIPTIONGROUP_CONFIG_VALUE = 201,
+        GET_TOPIC_STATS_INFO_VALUE = 202,
+        GET_CONSUMER_CONNECTION_LIST_VALUE = 203,
+        GET_PRODUCER_CONNECTION_LIST_VALUE = 204,
+        WIPE_WRITE_PERM_OF_BROKER_VALUE = 205,
+        GET_ALL_TOPIC_LIST_FROM_NAMESERVER_VALUE = 206,
+        DELETE_SUBSCRIPTIONGROUP_VALUE = 207,
+        GET_CONSUME_STATS_VALUE = 208,
+        SUSPEND_CONSUMER_VALUE = 209,
+
+        RESUME_CONSUMER_VALUE = 210,
+        RESET_CONSUMER_OFFSET_IN_CONSUMER_VALUE = 211,
+        RESET_CONSUMER_OFFSET_IN_BROKER_VALUE = 212,
+        ADJUST_CONSUMER_THREAD_POOL_VALUE = 213,
+        WHO_CONSUME_THE_MESSAGE_VALUE = 214,
+        DELETE_TOPIC_IN_BROKER_VALUE = 215,
+        DELETE_TOPIC_IN_NAMESRV_VALUE = 216,
+        GET_KV_CONFIG_BY_VALUE_VALUE = 217,
+        DELETE_KV_CONFIG_BY_VALUE_VALUE = 218,
+        GET_KVLIST_BY_NAMESPACE_VALUE = 219,
+
+        RESET_CONSUMER_CLIENT_OFFSET_VALUE = 220,
+        GET_CONSUMER_STATUS_FROM_CLIENT_VALUE = 221,
+        INVOKE_BROKER_TO_RESET_OFFSET_VALUE = 222,
+        INVOKE_BROKER_TO_GET_CONSUMER_STATUS_VALUE = 223,
+        GET_TOPICS_BY_CLUSTER_VALUE = 224,
+
+        QUERY_TOPIC_CONSUME_BY_WHO_VALUE = 300,
+        REGISTER_FILTER_SERVER_VALUE = 301,
+        REGISTER_MESSAGE_FILTER_CLASS_VALUE = 302,
+        QUERY_CONSUME_TIME_SPAN_VALUE = 303,
+        GET_SYSTEM_TOPIC_LIST_FROM_NS_VALUE = 304,
+        GET_SYSTEM_TOPIC_LIST_FROM_BROKER_VALUE = 305,
+        CLEAN_EXPIRED_CONSUMEQUEUE_VALUE = 306,
+        GET_CONSUMER_RUNNING_INFO_VALUE = 307,
+        QUERY_CORRECTION_OFFSET_VALUE = 308,
+        CONSUME_MESSAGE_DIRECTLY_VALUE = 309,
+
+        SEND_MESSAGE_V2_VALUE = 310,
+        GET_UNIT_TOPIC_LIST_VALUE = 311,
+        GET_HAS_UNIT_SUB_TOPIC_LIST_VALUE = 312,
+        GET_HAS_UNIT_SUB_UNUNIT_TOPIC_LIST_VALUE = 313,
+        CLONE_GROUP_OFFSET_VALUE = 314,
+        VIEW_BROKER_STATS_DATA_VALUE = 315,
+    };
+
+    enum MQResponseCode
+    {
+        FLUSH_DISK_TIMEOUT_VALUE = 10,
+        SLAVE_NOT_AVAILABLE_VALUE = 11,
+        FLUSH_SLAVE_TIMEOUT_VALUE = 12,
+        MESSAGE_ILLEGAL_VALUE = 13,
+        SERVICE_NOT_AVAILABLE_VALUE = 14,
+        VERSION_NOT_SUPPORTED_VALUE = 15,
+        NO_PERMISSION_VALUE = 16,
+        TOPIC_NOT_EXIST_VALUE = 17,
+        TOPIC_EXIST_ALREADY_VALUE = 18,
+        PULL_NOT_FOUND_VALUE = 19,
+
+        PULL_RETRY_IMMEDIATELY_VALUE = 20,
+        PULL_OFFSET_MOVED_VALUE = 21,
+        QUERY_NOT_FOUND_VALUE = 22,
+        SUBSCRIPTION_PARSE_FAILED_VALUE = 23,
+        SUBSCRIPTION_NOT_EXIST_VALUE = 24,
+        SUBSCRIPTION_NOT_LATEST_VALUE = 25,
+        SUBSCRIPTION_GROUP_NOT_EXIST_VALUE = 26,
+
+        TRANSACTION_SHOULD_COMMIT_VALUE = 200,
+        TRANSACTION_SHOULD_ROLLBACK_VALUE = 201,
+        TRANSACTION_STATE_UNKNOW_VALUE = 202,
+        TRANSACTION_STATE_GROUP_WRONG_VALUE = 203,
+               NO_BUYER_ID_VALUE = 204,
+               NOT_IN_CURRENT_UNIT_VALUE = 205,
+               CONSUMER_NOT_ONLINE_VALUE = 206,
+               CONSUME_MSG_TIMEOUT_VALUE = 207,
+    };
+
+       const char* getMQRequestCodeString(int code);
+       const char* getMQResponseCodeString(int code);
+}
+
+#endif

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/6a45c767/rocketmq-client4cpp/src/protocol/OffsetSerializeWrapper.h
----------------------------------------------------------------------
diff --git a/rocketmq-client4cpp/src/protocol/OffsetSerializeWrapper.h 
b/rocketmq-client4cpp/src/protocol/OffsetSerializeWrapper.h
new file mode 100755
index 0000000..56ee4e4
--- /dev/null
+++ b/rocketmq-client4cpp/src/protocol/OffsetSerializeWrapper.h
@@ -0,0 +1,135 @@
+/**
+* Copyright (C) 2013 kangliqiang ,[email protected]
+*
+* Licensed under the Apache License, Version 2.0 (the "License");
+* you may not use this file except in compliance with the License.
+* You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+#ifndef __OFFSETSERIALIZEWRAPPER_H__
+#define __OFFSETSERIALIZEWRAPPER_H__
+
+#include <map>
+#include <string>
+#include "RemotingSerializable.h"
+#include "MessageQueue.h"
+#include "AtomicValue.h"
+#include "UtilAll.h"
+#include "json/json.h"
+
+
+namespace rmq
+{
+    class OffsetSerializeWrapper : public RemotingSerializable
+    {
+    public:
+        void encode(std::string& outData)
+        {
+            Json::Value offsetTable;
+            RMQ_FOR_EACH(m_offsetTable, it)
+            {
+                MessageQueue mq = it->first;
+                kpr::AtomicLong& offset = it->second;
+
+                std::string mqStr = mq.toJsonString();
+                offsetTable[mqStr] = offset.get();
+            }
+
+            Json::Value obj;
+            obj["offsetTable"] = offsetTable;
+
+            Json::FastWriter writer;
+            outData = writer.write(obj);
+        }
+        static OffsetSerializeWrapper* decode(const char* pData, int len)
+        {
+            /*
+            {
+                "offsetTable":{
+                    
'{"brokerName":"broker-a","queueId":3,"topic":"TopicTest"}':0,
+                    
'{"brokerName":"broker-a","queueId":2,"topic":"TopicTest"}':0
+                }
+
+            }
+            */
+
+            RMQ_DEBUG("decode, data:%s", pData);
+
+            Json::Reader reader;
+            Json::Value obj;
+            if (!reader.parse(pData, pData + len, obj))
+            {
+                return NULL;
+            }
+
+            RMQ_DEBUG("decode ok");
+
+            if (obj.isObject())
+            {
+                Json::Value objOffsetTable = obj["offsetTable"];
+                if (objOffsetTable.isObject())
+                {
+                    std::map<MessageQueue, kpr::AtomicLong> offsetTable;
+                    OffsetSerializeWrapper* offsetWrapper = new 
OffsetSerializeWrapper();
+
+                    Json::Value::Members members = 
objOffsetTable.getMemberNames();
+                    for (typeof(members.begin()) it = members.begin(); it != 
members.end(); it++)
+                    {
+                        std::string key = *it;
+                        Json::Value objMq;
+                        RMQ_DEBUG("decode, key:%s", key.c_str());
+                        if (!reader.parse(key, objMq))
+                        {
+                            continue;
+                        }
+                        RMQ_DEBUG("decode, key ok");
+
+                        MessageQueue mq(objMq["topic"].asString(), 
objMq["brokerName"].asString(),
+                                        objMq["queueId"].asInt());
+                        long long offset = objOffsetTable[key].asInt64();
+
+                        offsetTable[mq] = kpr::AtomicLong(offset);
+                    }
+                    offsetWrapper->setOffsetTable(offsetTable);
+
+                    return offsetWrapper;
+                }
+            }
+
+            return NULL;
+        }
+
+               std::string toString() const
+               {
+                       std::stringstream ss;
+                       ss << "{offsetTable=" << 
UtilAll::toString(m_offsetTable)
+                          << "}";
+                       return ss.str();
+               }
+
+        std::map<MessageQueue, kpr::AtomicLong>& getOffsetTable()
+        {
+            return m_offsetTable;
+        }
+
+        void setOffsetTable(const std::map<MessageQueue, kpr::AtomicLong>& 
table)
+        {
+            m_offsetTable = table;
+        }
+
+    private:
+        std::map<MessageQueue, kpr::AtomicLong> m_offsetTable;
+    };
+
+    typedef kpr::RefHandleT<OffsetSerializeWrapper> OffsetSerializeWrapperPtr;
+}
+
+#endif

Reply via email to