http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/6a45c767/rocketmq-client4cpp/src/protocol/CommandCustomHeader.cpp ---------------------------------------------------------------------- diff --git a/rocketmq-client4cpp/src/protocol/CommandCustomHeader.cpp b/rocketmq-client4cpp/src/protocol/CommandCustomHeader.cpp new file mode 100755 index 0000000..fb2d2a6 --- /dev/null +++ b/rocketmq-client4cpp/src/protocol/CommandCustomHeader.cpp @@ -0,0 +1,672 @@ +/** +* Copyright (C) 2013 kangliqiang ,[email protected] +* +* Licensed under the Apache License, Version 2.0 (the "License"); +* you may not use this file except in compliance with the License. +* You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +*/ +#include "CommandCustomHeader.h" + +#include <stdlib.h> +#include <unistd.h> +#include <arpa/inet.h> +#include <sstream> +#include <string> +#include <cstdlib> +#include "RemotingCommand.h" +#include "MQProtos.h" +#include "KPRUtil.h" +#include "UtilAll.h" + +#include "json/json.h" + +namespace rmq +{ + + +CommandCustomHeader* CommandCustomHeader::decode(int code, Json::Value& data, bool isResponseType) +{ + CommandCustomHeader* pCustomHeader = NULL; + + try + { + if (isResponseType) + { + switch (code) + { + case SEND_MESSAGE_VALUE: + case SEND_MESSAGE_V2_VALUE: + pCustomHeader = SendMessageResponseHeader::decode(data); + break; + case PULL_MESSAGE_VALUE: + pCustomHeader = PullMessageResponseHeader::decode(data); + break; + case QUERY_CONSUMER_OFFSET_VALUE: + pCustomHeader = QueryConsumerOffsetResponseHeader::decode(data); + break; + case SEARCH_OFFSET_BY_TIMESTAMP_VALUE: + pCustomHeader = SearchOffsetResponseHeader::decode(data); + break; + case GET_MAX_OFFSET_VALUE: + pCustomHeader = GetMaxOffsetResponseHeader::decode(data); + break; + case GET_MIN_OFFSET_VALUE: + pCustomHeader = GetMinOffsetResponseHeader::decode(data); + break; + case GET_EARLIEST_MSG_STORETIME_VALUE: + pCustomHeader = GetEarliestMsgStoretimeResponseHeader::decode(data); + break; + case QUERY_MESSAGE_VALUE: + pCustomHeader = QueryMessageResponseHeader::decode(data); + break; + case GET_KV_CONFIG_VALUE: + pCustomHeader = GetKVConfigResponseHeader::decode(data); + break; + + default: + break; + } + } + else + { + switch (code) + { + case NOTIFY_CONSUMER_IDS_CHANGED_VALUE: + pCustomHeader = NotifyConsumerIdsChangedRequestHeader::decode(data); + break; + case GET_CONSUMER_RUNNING_INFO_VALUE: + pCustomHeader = GetConsumerRunningInfoRequestHeader::decode(data); + break; + default: + break; + } + } + } + catch(std::exception& e) + { + if (pCustomHeader != NULL) + { + delete pCustomHeader; + pCustomHeader = NULL; + } + RMQ_ERROR("CommandCustomHeader decode exception, %d, %d, %s, %s", + code, isResponseType, UtilAll::toString(data).c_str(), e.what()); + } + catch(...) + { + if (pCustomHeader != NULL) + { + delete pCustomHeader; + pCustomHeader = NULL; + } + RMQ_ERROR("CommandCustomHeader decode exception, %d, %d, %s", + code, isResponseType, UtilAll::toString(data).c_str()); + } + + return pCustomHeader; +} + + +//////////////////////////////////////////////////////////////////////////////// +//GET_ROUTEINTO_BY_TOPIC_VALUE +//////////////////////////////////////////////////////////////////////////////// +void GetRouteInfoRequestHeader::encode(std::string& outData) +{ + std::stringstream ss; + ss << "{" + << "\"topic\":\"" << topic << "\"" + << "}"; + + outData = ss.str(); +} + + +//////////////////////////////////////////////////////////////////////////////// +// UPDATE_AND_CREATE_TOPIC_VALUE +//////////////////////////////////////////////////////////////////////////////// +void CreateTopicRequestHeader::encode(std::string& outData) +{ + std::stringstream ss; + + ss << "{" + << "\"topic\":\"" << topic << "\"," + << "\"defaultTopic\":\"" << defaultTopic << "\"," + << "\"readQueueNums\":\"" << readQueueNums << "\"," + << "\"writeQueueNums\":\"" << writeQueueNums << "\"," + << "\"perm\":\"" << perm << "\"," + << "\"topicFilterType\":\"" << topicFilterType << "\"," + << "\"topicSysFlag\":\"" << topicFilterType << "\"," + << "\"order\":\"" << topicFilterType << "\"" + << "}"; + + outData = ss.str(); +} + + +//////////////////////////////////////////////////////////////////////////////// +// SEND_MESSAGE_VALUE/SEND_MESSAGE_V2_VALUE +//////////////////////////////////////////////////////////////////////////////// +void SendMessageRequestHeader::encode(std::string& outData) +{ + std::stringstream ss; + + ss << "{" + << "\"producerGroup\":\"" << producerGroup << "\"," + << "\"topic\":\"" << topic << "\"," + << "\"defaultTopic\":\"" << defaultTopic << "\"," + << "\"defaultTopicQueueNums\":" << defaultTopicQueueNums << "," + << "\"queueId\":" << queueId << "," + << "\"sysFlag\":" << sysFlag << "," + << "\"bornTimestamp\":" << bornTimestamp << "," + << "\"flag\":" << flag << "," + << "\"properties\":\"" << properties << "\"," + << "\"reconsumeTimes\":" << reconsumeTimes + << "}"; + + outData = ss.str(); +} + +void SendMessageRequestHeaderV2::encode(std::string& outData) +{ + std::stringstream ss; + + ss << "{" + << "\"a\":\"" << a << "\"," + << "\"b\":\"" << b << "\"," + << "\"c\":\"" << c << "\"," + << "\"d\":\"" << d << "\"," + << "\"e\":\"" << e << "\"," + << "\"f\":\"" << f << "\"," + << "\"g\":\"" << g << "\"," + << "\"h\":\"" << h << "\"," + << "\"i\":\"" << i << "\"," + << "\"j\":\"" << j << "\"" + << "}"; + + outData = ss.str(); +} + +SendMessageRequestHeader* SendMessageRequestHeaderV2::createSendMessageRequestHeaderV1( + const SendMessageRequestHeaderV2* v2) +{ + SendMessageRequestHeader* v1 = new SendMessageRequestHeader(); + v1->producerGroup = v2->a; + v1->topic = v2->b; + v1->defaultTopic = v2->c; + v1->defaultTopicQueueNums = v2->d; + v1->queueId = v2->e; + v1->sysFlag = v2->f; + v1->bornTimestamp = v2->g; + v1->flag = v2->h; + v1->properties = v2->i; + v1->reconsumeTimes = v2->j; + + return v1; +} + +SendMessageRequestHeaderV2* SendMessageRequestHeaderV2::createSendMessageRequestHeaderV2( + const SendMessageRequestHeader* v1) +{ + SendMessageRequestHeaderV2* v2 = new SendMessageRequestHeaderV2(); + v2->a = v1->producerGroup; + v2->b = v1->topic; + v2->c = v1->defaultTopic; + v2->d = v1->defaultTopicQueueNums; + v2->e = v1->queueId; + v2->f = v1->sysFlag; + v2->g = v1->bornTimestamp; + v2->h = v1->flag; + v2->i = v1->properties; + v2->j = v1->reconsumeTimes; + + return v2; +} + +void SendMessageResponseHeader::encode(std::string& outData) +{ +} + +CommandCustomHeader* SendMessageResponseHeader::decode(Json::Value& data) +{ + std::string msgId = data["msgId"].asString(); + int queueId = atoi(data["queueId"].asCString()); + long long queueOffset = KPRUtil::str2ll(data["queueOffset"].asCString()); + + SendMessageResponseHeader* h = new SendMessageResponseHeader(); + + h->msgId = msgId; + h->queueId = queueId; + h->queueOffset = queueOffset; + + return h; +} + + +//////////////////////////////////////////////////////////////////////////////// +// PULL_MESSAGE_VALUE +//////////////////////////////////////////////////////////////////////////////// +void PullMessageRequestHeader::encode(std::string& outData) +{ + std::stringstream ss; + + ss << "{" + << "\"consumerGroup\":\"" << consumerGroup << "\"," + << "\"topic\":\"" << topic << "\"," + << "\"queueId\":\"" << queueId << "\"," + << "\"queueOffset\":\"" << queueOffset << "\"," + << "\"maxMsgNums\":\"" << maxMsgNums << "\"," + << "\"sysFlag\":\"" << sysFlag << "\"," + << "\"commitOffset\":\"" << commitOffset << "\"," + << "\"suspendTimeoutMillis\":\"" << suspendTimeoutMillis << "\"," + << "\"subscription\":\"" << subscription << "\"," + << "\"subVersion\":\"" << subVersion << "\"" + << "}"; + + outData = ss.str(); +} + +void PullMessageResponseHeader::encode(std::string& outData) +{ + std::stringstream ss; + ss << "{" + << "\"suggestWhichBrokerId\":\"" << suggestWhichBrokerId << "\"," + << "\"nextBeginOffset\":\"" << nextBeginOffset << "\"," + << "\"minOffset\":\"" << minOffset << "\"," + << "\"maxOffset\":\"" << maxOffset << "\"" + << "}"; + outData = ss.str(); +} + +CommandCustomHeader* PullMessageResponseHeader::decode(Json::Value& data) +{ + long long suggestWhichBrokerId = KPRUtil::str2ll(data["suggestWhichBrokerId"].asCString()); + long long nextBeginOffset = KPRUtil::str2ll(data["nextBeginOffset"].asCString()); + long long minOffset = KPRUtil::str2ll(data["minOffset"].asCString()); + long long maxOffset = KPRUtil::str2ll(data["maxOffset"].asCString()); + + PullMessageResponseHeader* h = new PullMessageResponseHeader(); + h->suggestWhichBrokerId = suggestWhichBrokerId; + h->nextBeginOffset = nextBeginOffset; + h->minOffset = minOffset; + h->maxOffset = maxOffset; + + return h; +} + + + +//////////////////////////////////////////////////////////////////////////////// +// GET_CONSUMER_LIST_BY_GROUP_VALUE +//////////////////////////////////////////////////////////////////////////////// +void GetConsumerListByGroupRequestHeader::encode(std::string& outData) +{ + std::stringstream ss; + + ss << "{" + << "\"consumerGroup\":\"" << consumerGroup << "\"" + << "}"; + + outData = ss.str(); +} + + +//////////////////////////////////////////////////////////////////////////////// +// CONSUMER_SEND_MSG_BACK_VALUE +//////////////////////////////////////////////////////////////////////////////// +void ConsumerSendMsgBackRequestHeader::encode(std::string& outData) +{ + std::stringstream ss; + + ss << "{" + << "\"offset\":\"" << offset << "\"," + << "\"group\":\"" << group << "\"," + << "\"delayLevel\":\"" << delayLevel << "\"" + << "}"; + + outData = ss.str(); +} + + +//////////////////////////////////////////////////////////////////////////////// +// QUERY_CONSUMER_OFFSET_VALUE +//////////////////////////////////////////////////////////////////////////////// +void QueryConsumerOffsetRequestHeader::encode(std::string& outData) +{ + std::stringstream ss; + ss << "{" + << "\"consumerGroup\":\"" << consumerGroup << "\"," + << "\"topic\":\"" << topic << "\"," + << "\"queueId\":\"" << queueId << "\"" + << "}"; + outData = ss.str(); +} + +void QueryConsumerOffsetResponseHeader::encode(std::string& outData) +{ + std::stringstream ss; + ss << "{" + << "\"offset\":\"" << offset << "\"" + << "}"; + outData = ss.str(); +} + +CommandCustomHeader* QueryConsumerOffsetResponseHeader::decode(Json::Value& data) +{ + long long offset = -1; + + if (data.isMember("offset")) + { + offset = KPRUtil::str2ll(data["offset"].asCString()); + } + + QueryConsumerOffsetResponseHeader* h = new QueryConsumerOffsetResponseHeader(); + h->offset = offset; + + return h; +} + + +//////////////////////////////////////////////////////////////////////////////// +// UPDATE_CONSUMER_OFFSET_VALUE +//////////////////////////////////////////////////////////////////////////////// +void UpdateConsumerOffsetRequestHeader::encode(std::string& outData) +{ + std::stringstream ss; + ss << "{" + << "\"consumerGroup\":\"" << consumerGroup << "\"," + << "\"topic\":\"" << topic << "\"," + << "\"queueId\":\"" << queueId << "\"," + << "\"commitOffset\":\"" << commitOffset << "\"" + << "}"; + outData = ss.str(); +} + + +//////////////////////////////////////////////////////////////////////////////// +// UNREGISTER_CLIENT_VALUE +//////////////////////////////////////////////////////////////////////////////// +void UnregisterClientRequestHeader::encode(std::string& outData) +{ + std::stringstream ss; + ss << "{" + << "\"producerGroup\":\"" << producerGroup << "\"," + << "\"consumerGroup\":\"" << consumerGroup << "\"," + << "\"clientID\":\"" << clientID << "\"" + << "}"; + outData = ss.str(); +} + + +/////////////////////////////////////////////////////////////////////// +// VIEW_MESSAGE_BY_ID_VALUE +/////////////////////////////////////////////////////////////////////// +void ViewMessageRequestHeader::encode(std::string& outData) +{ + std::stringstream ss; + ss << "{" + << "\"offset\":" << offset + << "}"; + outData = ss.str(); +} + + +/////////////////////////////////////////////////////////////////////// +// SEARCH_OFFSET_BY_TIMESTAMP_VALUE +/////////////////////////////////////////////////////////////////////// +void SearchOffsetRequestHeader::encode(std::string& outData) +{ + std::stringstream ss; + ss << "{" + << "\"topic\":\"" << topic << "\"," + << "\"queueId\":\"" << queueId << "\"," + << "\"timestamp\":\"" << timestamp << "\"" + << "}"; + outData = ss.str(); +} + +void SearchOffsetResponseHeader::encode(std::string& outData) +{ + std::stringstream ss; + ss << "{" + << "\"offset\":\"" << offset << "\"" + << "}"; + outData = ss.str(); +} + +CommandCustomHeader* SearchOffsetResponseHeader::decode(Json::Value& data) +{ + long long offset = KPRUtil::str2ll(data["offset"].asCString()); + + SearchOffsetResponseHeader* h = new SearchOffsetResponseHeader(); + h->offset = offset; + + return h; +} + + +/////////////////////////////////////////////////////////////////////// +// GET_MAX_OFFSET_VALUE +/////////////////////////////////////////////////////////////////////// +void GetMaxOffsetRequestHeader::encode(std::string& outData) +{ + std::stringstream ss; + ss << "{" + << "\"topic\":\"" << topic << "\"," + << "\"queueId\":\"" << queueId << "\"" + << "}"; + outData = ss.str(); +} + +void GetMaxOffsetResponseHeader::encode(std::string& outData) +{ + std::stringstream ss; + ss << "{" + << "\"offset\":\"" << offset << "\"" + << "}"; + outData = ss.str(); +} + +CommandCustomHeader* GetMaxOffsetResponseHeader::decode(Json::Value& data) +{ + long long offset = KPRUtil::str2ll(data["offset"].asCString()); + + GetMaxOffsetResponseHeader* h = new GetMaxOffsetResponseHeader(); + h->offset = offset; + + return h; +} + + +/////////////////////////////////////////////////////////////////////// +// GET_MIN_OFFSET_VALUE +/////////////////////////////////////////////////////////////////////// +void GetMinOffsetRequestHeader::encode(std::string& outData) +{ + std::stringstream ss; + ss << "{" + << "\"topic\":\"" << topic << "\"," + << "\"queueId\":\"" << queueId << "\"" + << "}"; + outData = ss.str(); +} + +void GetMinOffsetResponseHeader::encode(std::string& outData) +{ + std::stringstream ss; + ss << "{" + << "\"offset\":\"" << offset << "\"" + << "}"; + outData = ss.str(); +} + +CommandCustomHeader* GetMinOffsetResponseHeader::decode(Json::Value& data) +{ + long long offset = KPRUtil::str2ll(data["offset"].asCString()); + + GetMinOffsetResponseHeader* h = new GetMinOffsetResponseHeader(); + h->offset = offset; + + return h; +} + + + +/////////////////////////////////////////////////////////////////////// +// GET_EARLIEST_MSG_STORETIME_VALUE +/////////////////////////////////////////////////////////////////////// +void GetEarliestMsgStoretimeRequestHeader::encode(std::string& outData) +{ + std::stringstream ss; + ss << "{" + << "\"topic\":\"" << topic << "\"," + << "\"queueId\":\"" << queueId << "\"" + << "}"; + outData = ss.str(); +} + +void GetEarliestMsgStoretimeResponseHeader::encode(std::string& outData) +{ + std::stringstream ss; + ss << "{" + << "\"timestamp\":\"" << timestamp << "\"" + << "}"; + outData = ss.str(); +} + + +CommandCustomHeader* GetEarliestMsgStoretimeResponseHeader::decode(Json::Value& data) +{ + long long timestamp = KPRUtil::str2ll(data["timestamp"].asCString()); + + GetEarliestMsgStoretimeResponseHeader* h = new GetEarliestMsgStoretimeResponseHeader(); + h->timestamp = timestamp; + + return h; +} + + +/////////////////////////////////////////////////////////////////////// +// QUERY_MESSAGE_VALUE +/////////////////////////////////////////////////////////////////////// +void QueryMessageRequestHeader::encode(std::string& outData) +{ + std::stringstream ss; + ss << "{" + << "\"topic\":\"" << topic << "\"," + << "\"key\":\"" << key << "\"," + << "\"maxNum\":\"" << maxNum << "\"," + << "\"beginTimestamp\":\"" << beginTimestamp << "\"," + << "\"endTimestamp\":\"" << endTimestamp << "\"" + << "}"; + outData = ss.str(); +} + +void QueryMessageResponseHeader::encode(std::string& outData) +{ + std::stringstream ss; + ss << "{" + << "\"indexLastUpdateTimestamp\":\"" << indexLastUpdateTimestamp << "\"," + << "\"indexLastUpdatePhyoffset\":\"" << indexLastUpdatePhyoffset << "\"" + << "}"; + outData = ss.str(); +} + +CommandCustomHeader* QueryMessageResponseHeader::decode(Json::Value& data) +{ + long long indexLastUpdateTimestamp = KPRUtil::str2ll(data["indexLastUpdateTimestamp"].asCString()); + long long indexLastUpdatePhyoffset = KPRUtil::str2ll(data["indexLastUpdatePhyoffset"].asCString()); + + QueryMessageResponseHeader* h = new QueryMessageResponseHeader(); + h->indexLastUpdateTimestamp = indexLastUpdateTimestamp; + h->indexLastUpdatePhyoffset = indexLastUpdatePhyoffset; + + return h; +} + + +/////////////////////////////////////////////////////////////////////// +// GET_KV_CONFIG_VALUE +/////////////////////////////////////////////////////////////////////// +void GetKVConfigRequestHeader::encode(std::string& outData) +{ + std::stringstream ss; + ss << "{" + << "\"namespace\":\"" << namespace_ << "\"," + << "\"key\":\"" << key << "\"" + << "}"; + outData = ss.str(); +} + +void GetKVConfigResponseHeader::encode(std::string& outData) +{ + std::stringstream ss; + ss << "{" + << "\"value\":\"" << value << "\"" + << "}"; + outData = ss.str(); +} + +CommandCustomHeader* GetKVConfigResponseHeader::decode(Json::Value& data) +{ + GetKVConfigResponseHeader* h = new GetKVConfigResponseHeader(); + h->value = data["value"].asString(); + + return h; +} + + +/////////////////////////////////////////////////////////////////////// +// NOTIFY_CONSUMER_IDS_CHANGED_VALUE +/////////////////////////////////////////////////////////////////////// +void NotifyConsumerIdsChangedRequestHeader::encode(std::string& outData) +{ + std::stringstream ss; + ss << "{" + << "\"consumerGroup\":\"" << consumerGroup << "\"" + << "}"; + outData = ss.str(); +} + +CommandCustomHeader* NotifyConsumerIdsChangedRequestHeader::decode(Json::Value& data) +{ + NotifyConsumerIdsChangedRequestHeader* h = new NotifyConsumerIdsChangedRequestHeader(); + h->consumerGroup = data["consumerGroup"].asString(); + + return h; +} + + +/////////////////////////////////////////////////////////////////////// +// GET_CONSUMER_RUNNING_INFO_VALUE +/////////////////////////////////////////////////////////////////////// +void GetConsumerRunningInfoRequestHeader::encode(std::string& outData) +{ + std::stringstream ss; + ss << "{" + << "\"consumerGroup\":\"" << consumerGroup << "\"," + << "\"clientId\":\"" << clientId << "\"," + << "\"jstackEnable\":\"" << jstackEnable << "\"," + << "}"; + outData = ss.str(); +} + +CommandCustomHeader* GetConsumerRunningInfoRequestHeader::decode(Json::Value& data) +{ + GetConsumerRunningInfoRequestHeader* h = new GetConsumerRunningInfoRequestHeader(); + h->consumerGroup = data["consumerGroup"].asString(); + h->clientId = data["clientId"].asString(); + h->jstackEnable = false;//not support + + return h; +} + + +}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/6a45c767/rocketmq-client4cpp/src/protocol/CommandCustomHeader.h ---------------------------------------------------------------------- diff --git a/rocketmq-client4cpp/src/protocol/CommandCustomHeader.h b/rocketmq-client4cpp/src/protocol/CommandCustomHeader.h new file mode 100755 index 0000000..93f811a --- /dev/null +++ b/rocketmq-client4cpp/src/protocol/CommandCustomHeader.h @@ -0,0 +1,604 @@ +/** +* Copyright (C) 2013 kangliqiang ,[email protected] +* +* Licensed under the Apache License, Version 2.0 (the "License"); +* you may not use this file except in compliance with the License. +* You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +*/ + +#ifndef __COMMANDCUSTOMHEADER_H__ +#define __COMMANDCUSTOMHEADER_H__ + +#include <string> +#include <json/json.h> + +namespace rmq +{ + /** + * RemotingCommand custom header + * + */ + class CommandCustomHeader + { + public : + virtual ~CommandCustomHeader() {} + virtual void encode(std::string& outData) = 0; + static CommandCustomHeader* decode(int code, Json::Value& data, bool isResponseType); + }; + + /////////////////////////////////////////////////////////////////////// + // GET_ROUTEINTO_BY_TOPIC_VALUE + /////////////////////////////////////////////////////////////////////// + class GetRouteInfoRequestHeader : public CommandCustomHeader + { + public: + GetRouteInfoRequestHeader() + { + }; + ~GetRouteInfoRequestHeader() {}; + virtual void encode(std::string& outData); + static CommandCustomHeader* decode(Json::Value& data); + + public: + std::string topic; + }; + + /////////////////////////////////////////////////////////////////////// + // UPDATE_AND_CREATE_TOPIC_VALUE + /////////////////////////////////////////////////////////////////////// + class CreateTopicRequestHeader : public CommandCustomHeader + { + public: + CreateTopicRequestHeader() + { + readQueueNums = 0; + writeQueueNums = 0; + perm = 0; + topicSysFlag = 0; + order = false; + }; + ~CreateTopicRequestHeader() {}; + virtual void encode(std::string& outData); + static CommandCustomHeader* decode(Json::Value& data); + + public: + std::string topic; + std::string defaultTopic; + int readQueueNums; + int writeQueueNums; + int perm; + std::string topicFilterType; + int topicSysFlag; + bool order; + }; + + /////////////////////////////////////////////////////////////////////// + // SEND_MESSAGE_VALUE/SEND_MESSAGE_V2_VALUE + /////////////////////////////////////////////////////////////////////// + class SendMessageRequestHeader: public CommandCustomHeader + { + public: + SendMessageRequestHeader() + : defaultTopicQueueNums(0),queueId(0),sysFlag(0), + bornTimestamp(0),flag(0),reconsumeTimes(0) + { + }; + ~SendMessageRequestHeader() {}; + virtual void encode(std::string& outData); + static CommandCustomHeader* decode(Json::Value& data); + + public: + std::string producerGroup; + std::string topic; + std::string defaultTopic; + int defaultTopicQueueNums; + int queueId; + int sysFlag; + long long bornTimestamp; + int flag; + std::string properties; + int reconsumeTimes; + }; + + class SendMessageRequestHeaderV2: public CommandCustomHeader + { + public: + SendMessageRequestHeaderV2() + : d(0),e(0),f(0), + g(0),h(0),j(0) + { + }; + ~SendMessageRequestHeaderV2() {}; + + virtual void encode(std::string& outData); + static CommandCustomHeader* decode(Json::Value& data); + static SendMessageRequestHeader* createSendMessageRequestHeaderV1(const SendMessageRequestHeaderV2* v2); + static SendMessageRequestHeaderV2* createSendMessageRequestHeaderV2(const SendMessageRequestHeader* v1); + public: + std::string a; //producerGroup + std::string b; //topic + std::string c; //defaultTopic + int d; //defaultTopicQueueNums + int e; //queueId + int f; //sysFlag + long long g; //bornTimestamp + int h; //flag + std::string i; //properties + int j; //reconsumeTimes + }; + + class SendMessageResponseHeader: public CommandCustomHeader + { + public: + SendMessageResponseHeader() + { + queueId = 0; + queueOffset = 0; + }; + ~SendMessageResponseHeader() {}; + virtual void encode(std::string& outData); + static CommandCustomHeader* decode(Json::Value& data); + + public: + std::string msgId; + int queueId; + long long queueOffset; + }; + + + /////////////////////////////////////////////////////////////////////// + // PULL_MESSAGE_VALUE + /////////////////////////////////////////////////////////////////////// + class PullMessageRequestHeader: public CommandCustomHeader + { + public: + PullMessageRequestHeader() + { + queueId = 0; + queueOffset = 0; + maxMsgNums = 0; + sysFlag = 0; + commitOffset = 0; + suspendTimeoutMillis = 0; + subVersion = 0; + }; + ~PullMessageRequestHeader() {}; + virtual void encode(std::string& outData); + static CommandCustomHeader* decode(Json::Value& data); + + public: + std::string consumerGroup; + std::string topic; + int queueId; + long long queueOffset; + int maxMsgNums; + int sysFlag; + long long commitOffset; + long long suspendTimeoutMillis; + std::string subscription; + long long subVersion; + }; + + class PullMessageResponseHeader: public CommandCustomHeader + { + public: + PullMessageResponseHeader() + { + suggestWhichBrokerId = 0; + nextBeginOffset = 0; + minOffset = 0; + maxOffset = 0; + }; + ~PullMessageResponseHeader() {}; + virtual void encode(std::string& outData); + static CommandCustomHeader* decode(Json::Value& data); + + public: + long long suggestWhichBrokerId; + long long nextBeginOffset; + long long minOffset; + long long maxOffset; + }; + + /////////////////////////////////////////////////////////////////////// + // GET_CONSUMER_LIST_BY_GROUP_VALUE + /////////////////////////////////////////////////////////////////////// + class GetConsumerListByGroupRequestHeader : public CommandCustomHeader + { + public: + GetConsumerListByGroupRequestHeader() {}; + ~GetConsumerListByGroupRequestHeader() {}; + virtual void encode(std::string& outData); + static CommandCustomHeader* decode(Json::Value& data); + + public: + std::string consumerGroup; + }; + + + /////////////////////////////////////////////////////////////////////// + // CONSUMER_SEND_MSG_BACK_VALUE + /////////////////////////////////////////////////////////////////////// + class ConsumerSendMsgBackRequestHeader : public CommandCustomHeader + { + public: + ConsumerSendMsgBackRequestHeader() + { + offset = 0; + delayLevel = 0; + }; + ~ConsumerSendMsgBackRequestHeader() {}; + + virtual void encode(std::string& outData); + static CommandCustomHeader* decode(Json::Value& data); + + public: + long long offset; + std::string group; + int delayLevel; + }; + + + /////////////////////////////////////////////////////////////////////// + // QUERY_CONSUMER_OFFSET_VALUE + /////////////////////////////////////////////////////////////////////// + class QueryConsumerOffsetRequestHeader : public CommandCustomHeader + { + public: + QueryConsumerOffsetRequestHeader() + { + queueId = 0; + }; + ~QueryConsumerOffsetRequestHeader() {}; + + virtual void encode(std::string& outData); + static CommandCustomHeader* decode(Json::Value& data); + + public: + std::string consumerGroup; + std::string topic; + int queueId; + }; + + class QueryConsumerOffsetResponseHeader : public CommandCustomHeader + { + public: + QueryConsumerOffsetResponseHeader() + { + offset = 0; + }; + ~QueryConsumerOffsetResponseHeader() {}; + + virtual void encode(std::string& outData); + static CommandCustomHeader* decode(Json::Value& data); + + public: + long long offset; + }; + + /////////////////////////////////////////////////////////////////////// + // UPDATE_CONSUMER_OFFSET_VALUE + /////////////////////////////////////////////////////////////////////// + class UpdateConsumerOffsetRequestHeader : public CommandCustomHeader + { + public: + UpdateConsumerOffsetRequestHeader() + { + queueId = 0; + commitOffset = 0; + }; + ~UpdateConsumerOffsetRequestHeader() {}; + + virtual void encode(std::string& outData); + static CommandCustomHeader* decode(Json::Value& data); + + public: + std::string consumerGroup; + std::string topic; + int queueId; + long long commitOffset; + }; + + /////////////////////////////////////////////////////////////////////// + // UNREGISTER_CLIENT_VALUE + /////////////////////////////////////////////////////////////////////// + class UnregisterClientRequestHeader : public CommandCustomHeader + { + public: + UnregisterClientRequestHeader() {}; + ~UnregisterClientRequestHeader() {}; + + virtual void encode(std::string& outData); + static CommandCustomHeader* decode(Json::Value& data); + + public: + std::string clientID; + std::string producerGroup; + std::string consumerGroup; + }; + + + /////////////////////////////////////////////////////////////////////// + // VIEW_MESSAGE_BY_ID_VALUE + /////////////////////////////////////////////////////////////////////// + class ViewMessageRequestHeader : public CommandCustomHeader + { + public: + ViewMessageRequestHeader() + { + offset = 0; + }; + ~ViewMessageRequestHeader() {}; + + virtual void encode(std::string& outData); + static CommandCustomHeader* decode(Json::Value& data); + + public: + long long offset; + }; + + /////////////////////////////////////////////////////////////////////// + // SEARCH_OFFSET_BY_TIMESTAMP_VALUE + /////////////////////////////////////////////////////////////////////// + class SearchOffsetRequestHeader : public CommandCustomHeader + { + public: + SearchOffsetRequestHeader() + { + queueId = 0; + timestamp = 0; + }; + ~SearchOffsetRequestHeader() {}; + + virtual void encode(std::string& outData); + static CommandCustomHeader* decode(Json::Value& data); + + public: + std::string topic; + int queueId; + long long timestamp; + }; + + class SearchOffsetResponseHeader : public CommandCustomHeader + { + public: + SearchOffsetResponseHeader() + { + offset = 0; + }; + ~SearchOffsetResponseHeader() {}; + + virtual void encode(std::string& outData); + static CommandCustomHeader* decode(Json::Value& data); + + public: + long long offset; + }; + + /////////////////////////////////////////////////////////////////////// + // GET_MAX_OFFSET_VALUE + /////////////////////////////////////////////////////////////////////// + class GetMaxOffsetRequestHeader : public CommandCustomHeader + { + public: + GetMaxOffsetRequestHeader() + { + queueId = 0; + }; + ~GetMaxOffsetRequestHeader() {}; + + virtual void encode(std::string& outData); + static CommandCustomHeader* decode(Json::Value& data); + + public: + std::string topic; + int queueId; + }; + + class GetMaxOffsetResponseHeader : public CommandCustomHeader + { + public: + GetMaxOffsetResponseHeader() + { + offset = 0; + }; + ~GetMaxOffsetResponseHeader() {}; + + virtual void encode(std::string& outData); + static CommandCustomHeader* decode(Json::Value& data); + + public: + long long offset; + }; + + /////////////////////////////////////////////////////////////////////// + // GET_MIN_OFFSET_VALUE + /////////////////////////////////////////////////////////////////////// + class GetMinOffsetRequestHeader : public CommandCustomHeader + { + public: + GetMinOffsetRequestHeader() + { + queueId = 0; + }; + ~GetMinOffsetRequestHeader() {}; + + virtual void encode(std::string& outData); + static CommandCustomHeader* decode(Json::Value& data); + + public: + std::string topic; + int queueId; + }; + + class GetMinOffsetResponseHeader : public CommandCustomHeader + { + public: + GetMinOffsetResponseHeader() + { + offset = 0; + }; + ~GetMinOffsetResponseHeader() {}; + + virtual void encode(std::string& outData); + static CommandCustomHeader* decode(Json::Value& data); + + public: + long long offset; + }; + + + /////////////////////////////////////////////////////////////////////// + // GET_EARLIEST_MSG_STORETIME_VALUE + /////////////////////////////////////////////////////////////////////// + class GetEarliestMsgStoretimeRequestHeader : public CommandCustomHeader + { + public: + GetEarliestMsgStoretimeRequestHeader() + { + queueId = 0; + }; + ~GetEarliestMsgStoretimeRequestHeader() {}; + + virtual void encode(std::string& outData); + static CommandCustomHeader* decode(Json::Value& data); + + public: + std::string topic; + int queueId; + }; + + class GetEarliestMsgStoretimeResponseHeader : public CommandCustomHeader + { + public: + GetEarliestMsgStoretimeResponseHeader() + { + timestamp = 0; + }; + ~GetEarliestMsgStoretimeResponseHeader() {}; + + virtual void encode(std::string& outData); + static CommandCustomHeader* decode(Json::Value& data); + + public: + long long timestamp; + }; + + /////////////////////////////////////////////////////////////////////// + // QUERY_MESSAGE_VALUE + /////////////////////////////////////////////////////////////////////// + class QueryMessageRequestHeader : public CommandCustomHeader + { + public: + QueryMessageRequestHeader() + { + maxNum = 0; + beginTimestamp = 0; + endTimestamp = 0; + }; + ~QueryMessageRequestHeader() {}; + + virtual void encode(std::string& outData); + static CommandCustomHeader* decode(Json::Value& data); + + public: + std::string topic; + std::string key; + int maxNum; + long long beginTimestamp; + long long endTimestamp; + }; + + class QueryMessageResponseHeader : public CommandCustomHeader + { + public: + QueryMessageResponseHeader() + { + indexLastUpdateTimestamp = 0; + indexLastUpdatePhyoffset = 0; + }; + ~QueryMessageResponseHeader() {}; + + virtual void encode(std::string& outData); + static CommandCustomHeader* decode(Json::Value& data); + + public: + long long indexLastUpdateTimestamp; + long long indexLastUpdatePhyoffset; + }; + + /////////////////////////////////////////////////////////////////////// + // GET_KV_CONFIG_VALUE + /////////////////////////////////////////////////////////////////////// + class GetKVConfigRequestHeader : public CommandCustomHeader + { + public: + GetKVConfigRequestHeader() {}; + ~GetKVConfigRequestHeader() {}; + + virtual void encode(std::string& outData); + static CommandCustomHeader* decode(Json::Value& data); + + public: + std::string namespace_; + std::string key; + }; + + class GetKVConfigResponseHeader : public CommandCustomHeader + { + public: + GetKVConfigResponseHeader() {}; + ~GetKVConfigResponseHeader() {}; + + virtual void encode(std::string& outData); + static CommandCustomHeader* decode(Json::Value& data); + + public: + std::string value; + }; + + /////////////////////////////////////////////////////////////////////// + // NOTIFY_CONSUMER_IDS_CHANGED_VALUE + /////////////////////////////////////////////////////////////////////// + class NotifyConsumerIdsChangedRequestHeader : public CommandCustomHeader + { + public: + NotifyConsumerIdsChangedRequestHeader() {}; + ~NotifyConsumerIdsChangedRequestHeader() {}; + + virtual void encode(std::string& outData); + static CommandCustomHeader* decode(Json::Value& data); + + public: + std::string consumerGroup; + }; + + + /////////////////////////////////////////////////////////////////////// + // GET_CONSUMER_RUNNING_INFO_VALUE + /////////////////////////////////////////////////////////////////////// + class GetConsumerRunningInfoRequestHeader : public CommandCustomHeader + { + public: + GetConsumerRunningInfoRequestHeader() {}; + ~GetConsumerRunningInfoRequestHeader() {}; + + virtual void encode(std::string& outData); + static CommandCustomHeader* decode(Json::Value& data); + + public: + std::string consumerGroup; + std::string clientId; + bool jstackEnable; + }; +} + +#endif http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/6a45c767/rocketmq-client4cpp/src/protocol/ConsumerRunningInfo.cpp ---------------------------------------------------------------------- diff --git a/rocketmq-client4cpp/src/protocol/ConsumerRunningInfo.cpp b/rocketmq-client4cpp/src/protocol/ConsumerRunningInfo.cpp new file mode 100755 index 0000000..58cecde --- /dev/null +++ b/rocketmq-client4cpp/src/protocol/ConsumerRunningInfo.cpp @@ -0,0 +1,168 @@ +/** +* Copyright (C) 2013 kangliqiang ,[email protected] +* +* Licensed under the Apache License, Version 2.0 (the "License"); +* you may not use this file except in compliance with the License. +* You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +*/ +#include "ConsumerRunningInfo.h" + +namespace rmq +{ + +const std::string ConsumerRunningInfo::PROP_NAMESERVER_ADDR = "PROP_NAMESERVER_ADDR"; +const std::string ConsumerRunningInfo::PROP_THREADPOOL_CORE_SIZE = "PROP_THREADPOOL_CORE_SIZE"; +const std::string ConsumerRunningInfo::PROP_CONSUME_ORDERLY = "PROP_CONSUMEORDERLY"; +const std::string ConsumerRunningInfo::PROP_CONSUME_TYPE = "PROP_CONSUME_TYPE"; +const std::string ConsumerRunningInfo::PROP_CLIENT_VERSION = "PROP_CLIENT_VERSION"; +const std::string ConsumerRunningInfo::PROP_CONSUMER_START_TIMESTAMP = "PROP_CONSUMER_START_TIMESTAMP"; + + +ConsumerRunningInfo::ConsumerRunningInfo() +{ +} + +ConsumerRunningInfo::~ConsumerRunningInfo() +{ +} + +void ConsumerRunningInfo::encode(std::string& outData) +{ + std::stringstream ss; + ss << "{" + << "}"; + outData = ss.str(); +} + + +std::string ConsumerRunningInfo::formatString() +{ + std::string sb = "rocketmq-client4cpp not suppport this feature"; + + /* + // 1 + { + sb.append("#Consumer Properties#\n"); + Iterator<Entry<Object, Object>> it = m_properties.entrySet().iterator(); + while (it.hasNext()) { + Entry<Object, Object> next = it.next(); + String item = + String.format("%-40s: %s\n", next.getKey().toString(), next.getValue().toString()); + sb.append(item); + } + } + + // 2 + { + sb.append("\n\n#Consumer Subscription#\n"); + + Iterator<SubscriptionData> it = m_subscriptionSet.iterator(); + int i = 0; + while (it.hasNext()) { + SubscriptionData next = it.next(); + String item = String.format("%03d Topic: %-40s ClassFilter: %-8s SubExpression: %s\n", // + ++i,// + next.getTopic(),// + next.isClassFilterMode(),// + next.getSubString()); + + sb.append(item); + } + } + + // 3 + { + sb.append("\n\n#Consumer Offset#\n"); + sb.append(String.format("%-32s %-32s %-4s %-20s\n",// + "#Topic",// + "#Broker Name",// + "#QID",// + "#Consumer Offset"// + )); + + Iterator<Entry<MessageQueue, ProcessQueueInfo>> it = m_mqTable.entrySet().iterator(); + while (it.hasNext()) { + Entry<MessageQueue, ProcessQueueInfo> next = it.next(); + String item = String.format("%-32s %-32s %-4d %-20d\n",// + next.getKey().getTopic(),// + next.getKey().getBrokerName(),// + next.getKey().getQueueId(),// + next.getValue().getCommitOffset()); + + sb.append(item); + } + } + + // 4 + { + sb.append("\n\n#Consumer MQ Detail#\n"); + sb.append(String.format("%-32s %-32s %-4s %-20s\n",// + "#Topic",// + "#Broker Name",// + "#QID",// + "#ProcessQueueInfo"// + )); + + Iterator<Entry<MessageQueue, ProcessQueueInfo>> it = m_mqTable.entrySet().iterator(); + while (it.hasNext()) { + Entry<MessageQueue, ProcessQueueInfo> next = it.next(); + String item = String.format("%-32s %-32s %-4d %s\n",// + next.getKey().getTopic(),// + next.getKey().getBrokerName(),// + next.getKey().getQueueId(),// + next.getValue().toString()); + + sb.append(item); + } + } + + // 5 + { + sb.append("\n\n#Consumer RT&TPS#\n"); + sb.append(String.format("%-32s %14s %14s %14s %14s %18s %25s\n",// + "#Topic",// + "#Pull RT",// + "#Pull TPS",// + "#Consume RT",// + "#ConsumeOK TPS",// + "#ConsumeFailed TPS",// + "#ConsumeFailedMsgsInHour"// + )); + + Iterator<Entry<String, ConsumeStatus>> it = m_statusTable.entrySet().iterator(); + while (it.hasNext()) { + Entry<String, ConsumeStatus> next = it.next(); + String item = String.format("%-32s %14.2f %14.2f %14.2f %14.2f %18.2f %25d\n",// + next.getKey(),// + next.getValue().getPullRT(),// + next.getValue().getPullTPS(),// + next.getValue().getConsumeRT(),// + next.getValue().getConsumeOKTPS(),// + next.getValue().getConsumeFailedTPS(),// + next.getValue().getConsumeFailedMsgs()// + ); + + sb.append(item); + } + } + + // 6 + if (m_jstack != null) { + sb.append("\n\n#Consumer jstack#\n"); + sb.append(m_jstack); + } + */ + + return sb; +} + + +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/6a45c767/rocketmq-client4cpp/src/protocol/ConsumerRunningInfo.h ---------------------------------------------------------------------- diff --git a/rocketmq-client4cpp/src/protocol/ConsumerRunningInfo.h b/rocketmq-client4cpp/src/protocol/ConsumerRunningInfo.h new file mode 100755 index 0000000..588bf07 --- /dev/null +++ b/rocketmq-client4cpp/src/protocol/ConsumerRunningInfo.h @@ -0,0 +1,97 @@ +/** +* Copyright (C) 2013 kangliqiang ,[email protected] +* +* Licensed under the Apache License, Version 2.0 (the "License"); +* you may not use this file except in compliance with the License. +* You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +*/ + +#ifndef __ConsumerRunningInfo_H__ +#define __ConsumerRunningInfo_H__ + +#include <string> +#include <set> +#include <map> + +#include "RemotingSerializable.h" +#include "MessageQueue.h" +#include "SubscriptionData.h" +#include "ConsumerStatManage.h" + +namespace rmq +{ + class ConsumerRunningInfo : public RemotingSerializable + { + public: + ConsumerRunningInfo(); + ~ConsumerRunningInfo(); + + /* + std::map<std::string, std::string>& getProperties() + { + return m_properties; + } + void setProperties(const std::map<std::string, std::string>& properties) + { + m_properties = properties; + } + + std::map<MessageQueue, ProcessQueueInfo>& getMqTable() + { + return m_mqTable; + } + void setMqTable(const std::map<MessageQueue, ProcessQueueInfo>& mqTable) + { + m_mqTable = mqTable; + } + + std::map<std::string, ConsumeStatus>& getStatusTable() + { + return m_statusTable; + } + void setStatusTable(const std::map<std::string, ConsumeStatus>& statusTable) + { + m_statusTable = statusTable; + } + + std::set<SubscriptionData>& getSubscriptionSet() + { + return m_subscriptionSet; + } + void setSubscriptionSet(const std::set<SubscriptionData>& subscriptionSet) + { + m_subscriptionSet = subscriptionSet; + } + */ + + void encode(std::string& outData); + std::string formatString(); + + public: + static const std::string PROP_NAMESERVER_ADDR; + static const std::string PROP_THREADPOOL_CORE_SIZE; + static const std::string PROP_CONSUME_ORDERLY; + static const std::string PROP_CONSUME_TYPE; + static const std::string PROP_CLIENT_VERSION; + static const std::string PROP_CONSUMER_START_TIMESTAMP; + + private: + /* + std::map<std::string, std::string> m_properties; + std::set<SubscriptionData> m_subscriptionSet; + std::map<MessageQueue, ProcessQueueInfo> m_mqTable; + std::map<string, ConsumerStat> m_statusTable; + std::string m_jstack; + */ + }; +} + +#endif http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/6a45c767/rocketmq-client4cpp/src/protocol/GetConsumerListByGroupResponseBody.h ---------------------------------------------------------------------- diff --git a/rocketmq-client4cpp/src/protocol/GetConsumerListByGroupResponseBody.h b/rocketmq-client4cpp/src/protocol/GetConsumerListByGroupResponseBody.h new file mode 100755 index 0000000..0ea19da --- /dev/null +++ b/rocketmq-client4cpp/src/protocol/GetConsumerListByGroupResponseBody.h @@ -0,0 +1,97 @@ +/** +* Copyright (C) 2013 kangliqiang ,[email protected] +* +* Licensed under the Apache License, Version 2.0 (the "License"); +* you may not use this file except in compliance with the License. +* You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +*/ + +#ifndef __GETCONSUMERLISTBYGROUPRESPONSEBODY_H__ +#define __GETCONSUMERLISTBYGROUPRESPONSEBODY_H__ + +#include <string> +#include <sstream> +#include <list> +#include "UtilAll.h" +#include "RemotingSerializable.h" + +namespace rmq +{ + class GetConsumerListByGroupResponseBody : public RemotingSerializable + { + public: + GetConsumerListByGroupResponseBody() + { + + } + + ~GetConsumerListByGroupResponseBody() + { + + } + + void encode(std::string& outData) + { + + } + + static GetConsumerListByGroupResponseBody* decode(const char* pData, int len) + { + /* + {"consumerIdList":["10.12.22.213@DEFAULT", "10.12.22.213@xxx"]} + */ + //RMQ_DEBUG("GET_CONSUMER_LIST_BY_GROUP_VALUE:%s", pData); + + Json::Reader reader; + Json::Value object; + if (!reader.parse(pData, pData + len, object)) + { + RMQ_ERROR("parse fail: %s", reader.getFormattedErrorMessages().c_str()); + return NULL; + } + + GetConsumerListByGroupResponseBody* rsp = new GetConsumerListByGroupResponseBody(); + Json::Value cidList = object["consumerIdList"]; + for (size_t i = 0; i < cidList.size(); i++) + { + Json::Value cid = cidList[i]; + if (cid != Json::Value::null) + { + rsp->m_consumerIdList.push_back(cid.asString()); + } + } + + return rsp; + } + + std::list<std::string>& getConsumerIdList() + { + return m_consumerIdList; + } + + void setConsumerIdList(const std::list<std::string>& consumerIdList) + { + m_consumerIdList = consumerIdList; + } + + std::string toString() const + { + std::stringstream ss; + ss << "{consumerIdList=" << UtilAll::toString(m_consumerIdList) << "}"; + return ss.str(); + } + + private: + std::list<std::string> m_consumerIdList; + }; +} + +#endif http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/6a45c767/rocketmq-client4cpp/src/protocol/HeartbeatData.cpp ---------------------------------------------------------------------- diff --git a/rocketmq-client4cpp/src/protocol/HeartbeatData.cpp b/rocketmq-client4cpp/src/protocol/HeartbeatData.cpp new file mode 100755 index 0000000..73f197a --- /dev/null +++ b/rocketmq-client4cpp/src/protocol/HeartbeatData.cpp @@ -0,0 +1,52 @@ +/** +* Copyright (C) 2013 kangliqiang ,[email protected] +* +* Licensed under the Apache License, Version 2.0 (the "License"); +* you may not use this file except in compliance with the License. +* You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +*/ +#include "HeartbeatData.h" + +namespace rmq +{ + +void HeartbeatData::encode(std::string& outData) +{ + //{"clientID":"10.6.223.90@16164","consumerDataSet":[{"consumeFromWhere":"CONSUME_FROM_LAST_OFFSET","consumeType":"CONSUME_ACTIVELY","groupName":"please_rename_unique_group_name_5","messageModel":"CLUSTERING","subscriptionDataSet":[],"unitMode":false}],"producerDataSet":[{"groupName":"CLIENT_INNER_PRODUCER"}]} + Json::Value obj; + obj["clientID"] = m_clientID; + + Json::Value consumerDataSet(Json::arrayValue); + for (typeof(m_consumerDataSet.begin()) it = m_consumerDataSet.begin(); it != m_consumerDataSet.end(); it++) + { + Json::Value o; + (*it).toJson(o); + consumerDataSet.append(o); + } + obj["consumerDataSet"] = consumerDataSet; + + Json::Value producerDataSet(Json::arrayValue); + for (typeof(m_producerDataSet.begin()) it = m_producerDataSet.begin(); it != m_producerDataSet.end(); it++) + { + Json::Value o; + it->toJson(o); + producerDataSet.append(o); + } + obj["producerDataSet"] = producerDataSet; + + Json::FastWriter outer; + outData = outer.write(obj); +} + + + + +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/6a45c767/rocketmq-client4cpp/src/protocol/HeartbeatData.h ---------------------------------------------------------------------- diff --git a/rocketmq-client4cpp/src/protocol/HeartbeatData.h b/rocketmq-client4cpp/src/protocol/HeartbeatData.h new file mode 100755 index 0000000..cb0f720 --- /dev/null +++ b/rocketmq-client4cpp/src/protocol/HeartbeatData.h @@ -0,0 +1,157 @@ +/** +* Copyright (C) 2013 kangliqiang ,[email protected] +* +* Licensed under the Apache License, Version 2.0 (the "License"); +* you may not use this file except in compliance with the License. +* You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +*/ +#ifndef __HEARTBEATDATA_H__ +#define __HEARTBEATDATA_H__ + +#include <string> +#include <set> +#include <sstream> + +#include "RocketMQClient.h" +#include "ConsumeType.h" +#include "SubscriptionData.h" +#include "RemotingSerializable.h" +#include "UtilAll.h" + +namespace rmq +{ + struct ConsumerData + { + std::string groupName; + ConsumeType consumeType; + MessageModel messageModel; + ConsumeFromWhere consumeFromWhere; + std::set<SubscriptionData> subscriptionDataSet; + bool operator < (const ConsumerData& cd)const + { + return groupName < cd.groupName; + } + + void toJson(Json::Value& obj) const + { + //{"consumeFromWhere":"CONSUME_FROM_LAST_OFFSET","consumeType":"CONSUME_ACTIVELY","groupName":"please_rename_unique_group_name_5","messageModel":"CLUSTERING","subscriptionDataSet":[],"unitMode":false} + obj["groupName"] = groupName; + obj["messageModel"] = getMessageModelString(messageModel); + obj["consumeFromWhere"] = getConsumeFromWhereString(consumeFromWhere); + obj["consumeType"] = getConsumeTypeString(consumeType); + obj["unitMode"] = false; + + Json::Value objSub(Json::arrayValue); + RMQ_FOR_EACH(subscriptionDataSet, it) + { + Json::Value o; + (*it).toJson(o); + objSub.append(o); + } + obj["subscriptionDataSet"] = objSub; + } + + std::string toString() const + { + std::stringstream ss; + ss << "{groupName=" << groupName + << ",messageModel=" << getMessageModelString(messageModel) + << ",consumeFromWhere=" << getConsumeFromWhereString(consumeFromWhere) + << ",consumeType=" << getConsumeTypeString(consumeType) + << ",subscriptionDataSet=" << UtilAll::toString(subscriptionDataSet) + << "}"; + return ss.str(); + } + }; + inline std::ostream& operator<<(std::ostream& os, const ConsumerData& obj) + { + os << obj.toString(); + return os; + } + + struct ProducerData + { + std::string groupName; + bool operator < (const ProducerData& pd)const + { + return groupName < pd.groupName; + } + void toJson(Json::Value& obj) const + { + obj["groupName"] = groupName; + } + + std::string toString() const + { + std::stringstream ss; + ss << "{groupName=" << groupName << "}"; + return ss.str(); + } + }; + inline std::ostream& operator<<(std::ostream& os, const ProducerData& obj) + { + os << obj.toString(); + return os; + } + + + class HeartbeatData : public RemotingSerializable + { + public: + void encode(std::string& outData); + + std::string getClientID() + { + return m_clientID; + } + + void setClientID(const std::string& clientID) + { + m_clientID = clientID; + } + + std::set<ProducerData>& getProducerDataSet() + { + return m_producerDataSet; + } + + void setProducerDataSet(const std::set<ProducerData>& producerDataSet) + { + m_producerDataSet = producerDataSet; + } + + std::set<ConsumerData>& getConsumerDataSet() + { + return m_consumerDataSet; + } + + void setConsumerDataSet(const std::set<ConsumerData>& consumerDataSet) + { + m_consumerDataSet = consumerDataSet; + } + + std::string toString() const + { + std::stringstream ss; + ss << "{clientID=" << m_clientID + << ",producerDataSet=" << UtilAll::toString(m_producerDataSet) + << ",consumerDataSet=" << UtilAll::toString(m_consumerDataSet) << "}"; + return ss.str(); + } + + private: + std::string m_clientID; + std::set<ProducerData> m_producerDataSet; + std::set<ConsumerData> m_consumerDataSet; + }; +} + +#endif http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/6a45c767/rocketmq-client4cpp/src/protocol/KVTable.h ---------------------------------------------------------------------- diff --git a/rocketmq-client4cpp/src/protocol/KVTable.h b/rocketmq-client4cpp/src/protocol/KVTable.h new file mode 100755 index 0000000..726b872 --- /dev/null +++ b/rocketmq-client4cpp/src/protocol/KVTable.h @@ -0,0 +1,58 @@ +/** +* Copyright (C) 2013 kangliqiang ,[email protected] +* +* Licensed under the Apache License, Version 2.0 (the "License"); +* you may not use this file except in compliance with the License. +* You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +*/ + +#ifndef __KVTABLE_H__ +#define __KVTABLE_H__ + +#include <map> +#include <string> +#include "RemotingSerializable.h" +#include "UtilAll.h" + +namespace rmq +{ + class KVTable : public RemotingSerializable + { + public: + void encode(std::string& outData) + { + + } + + std::string toString() const + { + std::stringstream ss; + ss << "{table=" << UtilAll::toString(m_table) + << "}"; + return ss.str(); + } + + const std::map<std::string, std::string>& getTable() + { + return m_table; + } + + void setTable(const std::map<std::string, std::string>& table) + { + m_table = table; + } + + private: + std::map<std::string, std::string> m_table ; + }; +} + +#endif http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/6a45c767/rocketmq-client4cpp/src/protocol/LockBatchBody.cpp ---------------------------------------------------------------------- diff --git a/rocketmq-client4cpp/src/protocol/LockBatchBody.cpp b/rocketmq-client4cpp/src/protocol/LockBatchBody.cpp new file mode 100755 index 0000000..947abe2 --- /dev/null +++ b/rocketmq-client4cpp/src/protocol/LockBatchBody.cpp @@ -0,0 +1,112 @@ +/** +* Copyright (C) 2013 kangliqiang ,[email protected] +* +* Licensed under the Apache License, Version 2.0 (the "License"); +* you may not use this file except in compliance with the License. +* You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +*/ +#include "LockBatchBody.h" +#include "UtilAll.h" + +namespace rmq +{ + +LockBatchRequestBody::LockBatchRequestBody() +{ +} + +LockBatchRequestBody::~LockBatchRequestBody() +{ +} + +void LockBatchRequestBody::encode(std::string& outData) +{ + +} + +std::string LockBatchRequestBody::toString() const +{ + std::stringstream ss; + ss << "{consumerGroup=" << m_consumerGroup + << ",clientId=" << m_clientId + << ",mqSet=" << UtilAll::toString(m_mqSet) + << "}"; + return ss.str(); +} + + +std::string LockBatchRequestBody::getConsumerGroup() +{ + return m_consumerGroup; +} + +void LockBatchRequestBody::setConsumerGroup(const std::string& consumerGroup) +{ + m_consumerGroup = consumerGroup; +} + +std::string LockBatchRequestBody::getClientId() +{ + return m_clientId; +} + +void LockBatchRequestBody::setClientId(const std::string& clientId) +{ + m_clientId = clientId; +} + +std::set<MessageQueue>& LockBatchRequestBody::getMqSet() +{ + return m_mqSet; +} + +void LockBatchRequestBody::setMqSet(const std::set<MessageQueue>& mqSet) +{ + m_mqSet = mqSet; +} + +LockBatchResponseBody::LockBatchResponseBody() +{ +} + +LockBatchResponseBody::~LockBatchResponseBody() +{ +} + +void LockBatchResponseBody::encode(std::string& outData) +{ +} + +std::string LockBatchResponseBody::toString() const +{ + std::stringstream ss; + ss << "{consumerGroup=" << UtilAll::toString(m_lockOKMQSet) + << "}"; + return ss.str(); +} + + +LockBatchResponseBody* LockBatchResponseBody::decode(const char* pData, int len) +{ + return new LockBatchResponseBody(); +} + +std::set<MessageQueue> LockBatchResponseBody::getLockOKMQSet() +{ + return m_lockOKMQSet; +} + +void LockBatchResponseBody::setLockOKMQSet(const std::set<MessageQueue>& lockOKMQSet) +{ + m_lockOKMQSet = lockOKMQSet; +} + +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/6a45c767/rocketmq-client4cpp/src/protocol/LockBatchBody.h ---------------------------------------------------------------------- diff --git a/rocketmq-client4cpp/src/protocol/LockBatchBody.h b/rocketmq-client4cpp/src/protocol/LockBatchBody.h new file mode 100755 index 0000000..ab9ee02 --- /dev/null +++ b/rocketmq-client4cpp/src/protocol/LockBatchBody.h @@ -0,0 +1,73 @@ +/** +* Copyright (C) 2013 kangliqiang ,[email protected] +* +* Licensed under the Apache License, Version 2.0 (the "License"); +* you may not use this file except in compliance with the License. +* You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +*/ + +#ifndef __LOCKBATCHBODY_H__ +#define __LOCKBATCHBODY_H__ + +#include <string> +#include <set> + +#include "RemotingSerializable.h" +#include "MessageQueue.h" + +namespace rmq +{ + class LockBatchRequestBody : public RemotingSerializable + { + public: + LockBatchRequestBody(); + ~LockBatchRequestBody(); + + void encode(std::string& outData); + std::string toString() const; + + std::string getConsumerGroup(); + void setConsumerGroup(const std::string& consumerGroup); + + std::string getClientId(); + void setClientId(const std::string& clientId); + + std::set<MessageQueue>& getMqSet(); + void setMqSet(const std::set<MessageQueue>& mqSet); + + private: + std::string m_consumerGroup; + std::string m_clientId; + std::set<MessageQueue> m_mqSet; + }; + + class LockBatchResponseBody : public RemotingSerializable + { + public: + LockBatchResponseBody(); + ~LockBatchResponseBody(); + + void encode(std::string& outData); + std::string toString() const; + + static LockBatchResponseBody* decode(const char* pData, int len); + + std::set<MessageQueue> getLockOKMQSet(); + void setLockOKMQSet(const std::set<MessageQueue>& lockOKMQSet); + + private: + std::set<MessageQueue> m_lockOKMQSet; + }; + + typedef LockBatchRequestBody UnlockBatchRequestBody; +} + +#endif http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/6a45c767/rocketmq-client4cpp/src/protocol/MQProtos.cpp ---------------------------------------------------------------------- diff --git a/rocketmq-client4cpp/src/protocol/MQProtos.cpp b/rocketmq-client4cpp/src/protocol/MQProtos.cpp new file mode 100755 index 0000000..052c104 --- /dev/null +++ b/rocketmq-client4cpp/src/protocol/MQProtos.cpp @@ -0,0 +1,248 @@ +/** +* Copyright (C) 2013 kangliqiang ,[email protected] +* +* Licensed under the Apache License, Version 2.0 (the "License"); +* you may not use this file except in compliance with the License. +* You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +*/ + +#include "MQProtos.h" + +namespace rmq +{ + +const char* getMQRequestCodeString(int code) +{ + switch (code) + { + case SEND_MESSAGE_VALUE: + return "SEND_MESSAGE_VALUE"; + case PULL_MESSAGE_VALUE: + return "PULL_MESSAGE_VALUE"; + case QUERY_MESSAGE_VALUE: + return "QUERY_MESSAGE_VALUE"; + case QUERY_BROKER_OFFSET_VALUE: + return "QUERY_BROKER_OFFSET_VALUE"; + case QUERY_CONSUMER_OFFSET_VALUE: + return "QUERY_CONSUMER_OFFSET_VALUE"; + case UPDATE_CONSUMER_OFFSET_VALUE: + return "UPDATE_CONSUMER_OFFSET_VALUE"; + case UPDATE_AND_CREATE_TOPIC_VALUE: + return "UPDATE_AND_CREATE_TOPIC_VALUE"; + case GET_ALL_TOPIC_CONFIG_VALUE: + return "GET_ALL_TOPIC_CONFIG_VALUE"; + case GET_TOPIC_CONFIG_LIST_VALUE: + return "GET_TOPIC_CONFIG_LIST_VALUE"; + case GET_TOPIC_NAME_LIST_VALUE: + return "GET_TOPIC_NAME_LIST_VALUE"; + case UPDATE_BROKER_CONFIG_VALUE: + return "UPDATE_BROKER_CONFIG_VALUE"; + case GET_BROKER_CONFIG_VALUE: + return "GET_BROKER_CONFIG_VALUE"; + case TRIGGER_DELETE_FILES_VALUE: + return "TRIGGER_DELETE_FILES_VALUE"; + case GET_BROKER_RUNTIME_INFO_VALUE: + return "GET_BROKER_RUNTIME_INFO_VALUE"; + case SEARCH_OFFSET_BY_TIMESTAMP_VALUE: + return "SEARCH_OFFSET_BY_TIMESTAMP_VALUE"; + case GET_MAX_OFFSET_VALUE: + return "GET_MAX_OFFSET_VALUE"; + case GET_MIN_OFFSET_VALUE: + return "GET_MIN_OFFSET_VALUE"; + case GET_EARLIEST_MSG_STORETIME_VALUE: + return "GET_EARLIEST_MSG_STORETIME_VALUE"; + case VIEW_MESSAGE_BY_ID_VALUE: + return "VIEW_MESSAGE_BY_ID_VALUE"; + case HEART_BEAT_VALUE: + return "HEART_BEAT_VALUE"; + case UNREGISTER_CLIENT_VALUE: + return "UNREGISTER_CLIENT_VALUE"; + case CONSUMER_SEND_MSG_BACK_VALUE: + return "CONSUMER_SEND_MSG_BACK_VALUE"; + case END_TRANSACTION_VALUE: + return "END_TRANSACTION_VALUE"; + case GET_CONSUMER_LIST_BY_GROUP_VALUE: + return "GET_CONSUMER_LIST_BY_GROUP_VALUE"; + case CHECK_TRANSACTION_STATE_VALUE: + return "CHECK_TRANSACTION_STATE_VALUE"; + case NOTIFY_CONSUMER_IDS_CHANGED_VALUE: + return "NOTIFY_CONSUMER_IDS_CHANGED_VALUE"; + case LOCK_BATCH_MQ_VALUE: + return "LOCK_BATCH_MQ_VALUE"; + case UNLOCK_BATCH_MQ_VALUE: + return "UNLOCK_BATCH_MQ_VALUE"; + case GET_ALL_CONSUMER_OFFSET_VALUE: + return "GET_ALL_CONSUMER_OFFSET_VALUE"; + case GET_ALL_DELAY_OFFSET_VALUE: + return "GET_ALL_DELAY_OFFSET_VALUE"; + case PUT_KV_CONFIG_VALUE: + return "PUT_KV_CONFIG_VALUE"; + case GET_KV_CONFIG_VALUE: + return "GET_KV_CONFIG_VALUE"; + case DELETE_KV_CONFIG_VALUE: + return "DELETE_KV_CONFIG_VALUE"; + case REGISTER_BROKER_VALUE: + return "REGISTER_BROKER_VALUE"; + case UNREGISTER_BROKER_VALUE: + return "UNREGISTER_BROKER_VALUE"; + case GET_ROUTEINTO_BY_TOPIC_VALUE: + return "GET_ROUTEINTO_BY_TOPIC_VALUE"; + case GET_BROKER_CLUSTER_INFO_VALUE: + return "GET_BROKER_CLUSTER_INFO_VALUE"; + case UPDATE_AND_CREATE_SUBSCRIPTIONGROUP_VALUE: + return "UPDATE_AND_CREATE_SUBSCRIPTIONGROUP_VALUE"; + case GET_ALL_SUBSCRIPTIONGROUP_CONFIG_VALUE: + return "GET_ALL_SUBSCRIPTIONGROUP_CONFIG_VALUE"; + case GET_TOPIC_STATS_INFO_VALUE: + return "GET_TOPIC_STATS_INFO_VALUE"; + case GET_CONSUMER_CONNECTION_LIST_VALUE: + return "GET_CONSUMER_CONNECTION_LIST_VALUE"; + case GET_PRODUCER_CONNECTION_LIST_VALUE: + return "GET_PRODUCER_CONNECTION_LIST_VALUE"; + case WIPE_WRITE_PERM_OF_BROKER_VALUE: + return "WIPE_WRITE_PERM_OF_BROKER_VALUE"; + case GET_ALL_TOPIC_LIST_FROM_NAMESERVER_VALUE: + return "GET_ALL_TOPIC_LIST_FROM_NAMESERVER_VALUE"; + case DELETE_SUBSCRIPTIONGROUP_VALUE: + return "DELETE_SUBSCRIPTIONGROUP_VALUE"; + case GET_CONSUME_STATS_VALUE: + return "GET_CONSUME_STATS_VALUE"; + case SUSPEND_CONSUMER_VALUE: + return "SUSPEND_CONSUMER_VALUE"; + case RESUME_CONSUMER_VALUE: + return "RESUME_CONSUMER_VALUE"; + case RESET_CONSUMER_OFFSET_IN_CONSUMER_VALUE: + return "RESET_CONSUMER_OFFSET_IN_CONSUMER_VALUE"; + case RESET_CONSUMER_OFFSET_IN_BROKER_VALUE: + return "RESET_CONSUMER_OFFSET_IN_BROKER_VALUE"; + case ADJUST_CONSUMER_THREAD_POOL_VALUE: + return "ADJUST_CONSUMER_THREAD_POOL_VALUE"; + case WHO_CONSUME_THE_MESSAGE_VALUE: + return "WHO_CONSUME_THE_MESSAGE_VALUE"; + case DELETE_TOPIC_IN_BROKER_VALUE: + return "DELETE_TOPIC_IN_BROKER_VALUE"; + case DELETE_TOPIC_IN_NAMESRV_VALUE: + return "DELETE_TOPIC_IN_NAMESRV_VALUE"; + case GET_KV_CONFIG_BY_VALUE_VALUE: + return "GET_KV_CONFIG_BY_VALUE_VALUE"; + case DELETE_KV_CONFIG_BY_VALUE_VALUE: + return "DELETE_KV_CONFIG_BY_VALUE_VALUE"; + case GET_KVLIST_BY_NAMESPACE_VALUE: + return "GET_KVLIST_BY_NAMESPACE_VALUE"; + case RESET_CONSUMER_CLIENT_OFFSET_VALUE: + return "RESET_CONSUMER_CLIENT_OFFSET_VALUE"; + case GET_CONSUMER_STATUS_FROM_CLIENT_VALUE: + return "GET_CONSUMER_STATUS_FROM_CLIENT_VALUE"; + case INVOKE_BROKER_TO_RESET_OFFSET_VALUE: + return "INVOKE_BROKER_TO_RESET_OFFSET_VALUE"; + case INVOKE_BROKER_TO_GET_CONSUMER_STATUS_VALUE: + return "INVOKE_BROKER_TO_GET_CONSUMER_STATUS_VALUE"; + case QUERY_TOPIC_CONSUME_BY_WHO_VALUE: + return "QUERY_TOPIC_CONSUME_BY_WHO_VALUE"; + case GET_TOPICS_BY_CLUSTER_VALUE: + return "GET_TOPICS_BY_CLUSTER_VALUE"; + case REGISTER_FILTER_SERVER_VALUE: + return "REGISTER_FILTER_SERVER_VALUE"; + case REGISTER_MESSAGE_FILTER_CLASS_VALUE: + return "REGISTER_MESSAGE_FILTER_CLASS_VALUE"; + case QUERY_CONSUME_TIME_SPAN_VALUE: + return "QUERY_CONSUME_TIME_SPAN_VALUE"; + case GET_SYSTEM_TOPIC_LIST_FROM_NS_VALUE: + return "GET_SYSTEM_TOPIC_LIST_FROM_NS_VALUE"; + case GET_SYSTEM_TOPIC_LIST_FROM_BROKER_VALUE: + return "GET_SYSTEM_TOPIC_LIST_FROM_BROKER_VALUE"; + case CLEAN_EXPIRED_CONSUMEQUEUE_VALUE: + return "CLEAN_EXPIRED_CONSUMEQUEUE_VALUE"; + case GET_CONSUMER_RUNNING_INFO_VALUE: + return "GET_CONSUMER_RUNNING_INFO_VALUE"; + case QUERY_CORRECTION_OFFSET_VALUE: + return "QUERY_CORRECTION_OFFSET_VALUE"; + case CONSUME_MESSAGE_DIRECTLY_VALUE: + return "CONSUME_MESSAGE_DIRECTLY_VALUE"; + case SEND_MESSAGE_V2_VALUE: + return "SEND_MESSAGE_V2_VALUE"; + case GET_UNIT_TOPIC_LIST_VALUE: + return "GET_UNIT_TOPIC_LIST_VALUE"; + case GET_HAS_UNIT_SUB_TOPIC_LIST_VALUE: + return "GET_HAS_UNIT_SUB_TOPIC_LIST_VALUE"; + case GET_HAS_UNIT_SUB_UNUNIT_TOPIC_LIST_VALUE: + return "GET_HAS_UNIT_SUB_UNUNIT_TOPIC_LIST_VALUE"; + case CLONE_GROUP_OFFSET_VALUE: + return "CLONE_GROUP_OFFSET_VALUE"; + case VIEW_BROKER_STATS_DATA_VALUE: + return "VIEW_BROKER_STATS_DATA_VALUE"; + } + + return "UnknowMQRequestCode"; +} + +const char* getMQResponseCodeString(int code) +{ + switch (code) + { + case 0: + return "OK"; + case FLUSH_DISK_TIMEOUT_VALUE: + return "FLUSH_DISK_TIMEOUT_VALUE"; + case SLAVE_NOT_AVAILABLE_VALUE: + return "SLAVE_NOT_AVAILABLE_VALUE"; + case FLUSH_SLAVE_TIMEOUT_VALUE: + return "FLUSH_SLAVE_TIMEOUT_VALUE"; + case MESSAGE_ILLEGAL_VALUE: + return "MESSAGE_ILLEGAL_VALUE"; + case SERVICE_NOT_AVAILABLE_VALUE: + return "SERVICE_NOT_AVAILABLE_VALUE"; + case VERSION_NOT_SUPPORTED_VALUE: + return "VERSION_NOT_SUPPORTED_VALUE"; + case NO_PERMISSION_VALUE: + return "NO_PERMISSION_VALUE"; + case TOPIC_NOT_EXIST_VALUE: + return "TOPIC_NOT_EXIST_VALUE"; + case TOPIC_EXIST_ALREADY_VALUE: + return "TOPIC_EXIST_ALREADY_VALUE"; + case PULL_NOT_FOUND_VALUE: + return "PULL_NOT_FOUND_VALUE"; + case PULL_RETRY_IMMEDIATELY_VALUE: + return "PULL_RETRY_IMMEDIATELY_VALUE"; + case PULL_OFFSET_MOVED_VALUE: + return "PULL_OFFSET_MOVED_VALUE"; + case QUERY_NOT_FOUND_VALUE: + return "QUERY_NOT_FOUND_VALUE"; + case SUBSCRIPTION_PARSE_FAILED_VALUE: + return "SUBSCRIPTION_PARSE_FAILED_VALUE"; + case SUBSCRIPTION_NOT_EXIST_VALUE: + return "SUBSCRIPTION_NOT_EXIST_VALUE"; + case SUBSCRIPTION_NOT_LATEST_VALUE: + return "SUBSCRIPTION_NOT_LATEST_VALUE"; + case SUBSCRIPTION_GROUP_NOT_EXIST_VALUE: + return "SUBSCRIPTION_GROUP_NOT_EXIST_VALUE"; + case TRANSACTION_SHOULD_COMMIT_VALUE: + return "TRANSACTION_SHOULD_COMMIT_VALUE"; + case TRANSACTION_SHOULD_ROLLBACK_VALUE: + return "TRANSACTION_SHOULD_ROLLBACK_VALUE"; + case TRANSACTION_STATE_UNKNOW_VALUE: + return "TRANSACTION_STATE_UNKNOW_VALUE"; + case TRANSACTION_STATE_GROUP_WRONG_VALUE: + return "TRANSACTION_STATE_GROUP_WRONG_VALUE"; + case NO_BUYER_ID_VALUE: + return "NO_BUYER_ID_VALUE"; + case NOT_IN_CURRENT_UNIT_VALUE: + return "NOT_IN_CURRENT_UNIT_VALUE"; + case CONSUMER_NOT_ONLINE_VALUE: + return "CONSUMER_NOT_ONLINE_VALUE"; + case CONSUME_MSG_TIMEOUT_VALUE: + return "CONSUME_MSG_TIMEOUT_VALUE"; + } + + return "UnknowMQResponseCode"; +} + +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/6a45c767/rocketmq-client4cpp/src/protocol/MQProtos.h ---------------------------------------------------------------------- diff --git a/rocketmq-client4cpp/src/protocol/MQProtos.h b/rocketmq-client4cpp/src/protocol/MQProtos.h new file mode 100755 index 0000000..94167ea --- /dev/null +++ b/rocketmq-client4cpp/src/protocol/MQProtos.h @@ -0,0 +1,150 @@ +/** +* Copyright (C) 2013 kangliqiang ,[email protected] +* +* Licensed under the Apache License, Version 2.0 (the "License"); +* you may not use this file except in compliance with the License. +* You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +*/ +#ifndef __MQPROTOS_H__ +#define __MQPROTOS_H__ + +namespace rmq +{ + enum MQRequestCode + { + // broker + SEND_MESSAGE_VALUE = 10, + PULL_MESSAGE_VALUE = 11, + QUERY_MESSAGE_VALUE = 12, + QUERY_BROKER_OFFSET_VALUE = 13, + QUERY_CONSUMER_OFFSET_VALUE = 14, + UPDATE_CONSUMER_OFFSET_VALUE = 15, + UPDATE_AND_CREATE_TOPIC_VALUE = 17, + + GET_ALL_TOPIC_CONFIG_VALUE = 21, + GET_TOPIC_CONFIG_LIST_VALUE = 22, + GET_TOPIC_NAME_LIST_VALUE = 23, + UPDATE_BROKER_CONFIG_VALUE = 25, + GET_BROKER_CONFIG_VALUE = 26, + TRIGGER_DELETE_FILES_VALUE = 27, + GET_BROKER_RUNTIME_INFO_VALUE = 28, + SEARCH_OFFSET_BY_TIMESTAMP_VALUE = 29, + + GET_MAX_OFFSET_VALUE = 30, + GET_MIN_OFFSET_VALUE = 31, + GET_EARLIEST_MSG_STORETIME_VALUE = 32, + VIEW_MESSAGE_BY_ID_VALUE = 33, + HEART_BEAT_VALUE = 34, + UNREGISTER_CLIENT_VALUE = 35, + CONSUMER_SEND_MSG_BACK_VALUE = 36, + END_TRANSACTION_VALUE = 37, + GET_CONSUMER_LIST_BY_GROUP_VALUE = 38, + CHECK_TRANSACTION_STATE_VALUE = 39, + + NOTIFY_CONSUMER_IDS_CHANGED_VALUE = 40, + LOCK_BATCH_MQ_VALUE = 41, + UNLOCK_BATCH_MQ_VALUE = 42, + GET_ALL_CONSUMER_OFFSET_VALUE = 43, + GET_ALL_DELAY_OFFSET_VALUE = 45, + + // Namesrv + PUT_KV_CONFIG_VALUE = 100, + GET_KV_CONFIG_VALUE = 101, + DELETE_KV_CONFIG_VALUE = 102, + REGISTER_BROKER_VALUE = 103, + UNREGISTER_BROKER_VALUE = 104, + GET_ROUTEINTO_BY_TOPIC_VALUE = 105, + GET_BROKER_CLUSTER_INFO_VALUE = 106, + + // broker && namesrv + UPDATE_AND_CREATE_SUBSCRIPTIONGROUP_VALUE = 200, + GET_ALL_SUBSCRIPTIONGROUP_CONFIG_VALUE = 201, + GET_TOPIC_STATS_INFO_VALUE = 202, + GET_CONSUMER_CONNECTION_LIST_VALUE = 203, + GET_PRODUCER_CONNECTION_LIST_VALUE = 204, + WIPE_WRITE_PERM_OF_BROKER_VALUE = 205, + GET_ALL_TOPIC_LIST_FROM_NAMESERVER_VALUE = 206, + DELETE_SUBSCRIPTIONGROUP_VALUE = 207, + GET_CONSUME_STATS_VALUE = 208, + SUSPEND_CONSUMER_VALUE = 209, + + RESUME_CONSUMER_VALUE = 210, + RESET_CONSUMER_OFFSET_IN_CONSUMER_VALUE = 211, + RESET_CONSUMER_OFFSET_IN_BROKER_VALUE = 212, + ADJUST_CONSUMER_THREAD_POOL_VALUE = 213, + WHO_CONSUME_THE_MESSAGE_VALUE = 214, + DELETE_TOPIC_IN_BROKER_VALUE = 215, + DELETE_TOPIC_IN_NAMESRV_VALUE = 216, + GET_KV_CONFIG_BY_VALUE_VALUE = 217, + DELETE_KV_CONFIG_BY_VALUE_VALUE = 218, + GET_KVLIST_BY_NAMESPACE_VALUE = 219, + + RESET_CONSUMER_CLIENT_OFFSET_VALUE = 220, + GET_CONSUMER_STATUS_FROM_CLIENT_VALUE = 221, + INVOKE_BROKER_TO_RESET_OFFSET_VALUE = 222, + INVOKE_BROKER_TO_GET_CONSUMER_STATUS_VALUE = 223, + GET_TOPICS_BY_CLUSTER_VALUE = 224, + + QUERY_TOPIC_CONSUME_BY_WHO_VALUE = 300, + REGISTER_FILTER_SERVER_VALUE = 301, + REGISTER_MESSAGE_FILTER_CLASS_VALUE = 302, + QUERY_CONSUME_TIME_SPAN_VALUE = 303, + GET_SYSTEM_TOPIC_LIST_FROM_NS_VALUE = 304, + GET_SYSTEM_TOPIC_LIST_FROM_BROKER_VALUE = 305, + CLEAN_EXPIRED_CONSUMEQUEUE_VALUE = 306, + GET_CONSUMER_RUNNING_INFO_VALUE = 307, + QUERY_CORRECTION_OFFSET_VALUE = 308, + CONSUME_MESSAGE_DIRECTLY_VALUE = 309, + + SEND_MESSAGE_V2_VALUE = 310, + GET_UNIT_TOPIC_LIST_VALUE = 311, + GET_HAS_UNIT_SUB_TOPIC_LIST_VALUE = 312, + GET_HAS_UNIT_SUB_UNUNIT_TOPIC_LIST_VALUE = 313, + CLONE_GROUP_OFFSET_VALUE = 314, + VIEW_BROKER_STATS_DATA_VALUE = 315, + }; + + enum MQResponseCode + { + FLUSH_DISK_TIMEOUT_VALUE = 10, + SLAVE_NOT_AVAILABLE_VALUE = 11, + FLUSH_SLAVE_TIMEOUT_VALUE = 12, + MESSAGE_ILLEGAL_VALUE = 13, + SERVICE_NOT_AVAILABLE_VALUE = 14, + VERSION_NOT_SUPPORTED_VALUE = 15, + NO_PERMISSION_VALUE = 16, + TOPIC_NOT_EXIST_VALUE = 17, + TOPIC_EXIST_ALREADY_VALUE = 18, + PULL_NOT_FOUND_VALUE = 19, + + PULL_RETRY_IMMEDIATELY_VALUE = 20, + PULL_OFFSET_MOVED_VALUE = 21, + QUERY_NOT_FOUND_VALUE = 22, + SUBSCRIPTION_PARSE_FAILED_VALUE = 23, + SUBSCRIPTION_NOT_EXIST_VALUE = 24, + SUBSCRIPTION_NOT_LATEST_VALUE = 25, + SUBSCRIPTION_GROUP_NOT_EXIST_VALUE = 26, + + TRANSACTION_SHOULD_COMMIT_VALUE = 200, + TRANSACTION_SHOULD_ROLLBACK_VALUE = 201, + TRANSACTION_STATE_UNKNOW_VALUE = 202, + TRANSACTION_STATE_GROUP_WRONG_VALUE = 203, + NO_BUYER_ID_VALUE = 204, + NOT_IN_CURRENT_UNIT_VALUE = 205, + CONSUMER_NOT_ONLINE_VALUE = 206, + CONSUME_MSG_TIMEOUT_VALUE = 207, + }; + + const char* getMQRequestCodeString(int code); + const char* getMQResponseCodeString(int code); +} + +#endif http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/6a45c767/rocketmq-client4cpp/src/protocol/OffsetSerializeWrapper.h ---------------------------------------------------------------------- diff --git a/rocketmq-client4cpp/src/protocol/OffsetSerializeWrapper.h b/rocketmq-client4cpp/src/protocol/OffsetSerializeWrapper.h new file mode 100755 index 0000000..56ee4e4 --- /dev/null +++ b/rocketmq-client4cpp/src/protocol/OffsetSerializeWrapper.h @@ -0,0 +1,135 @@ +/** +* Copyright (C) 2013 kangliqiang ,[email protected] +* +* Licensed under the Apache License, Version 2.0 (the "License"); +* you may not use this file except in compliance with the License. +* You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +*/ + +#ifndef __OFFSETSERIALIZEWRAPPER_H__ +#define __OFFSETSERIALIZEWRAPPER_H__ + +#include <map> +#include <string> +#include "RemotingSerializable.h" +#include "MessageQueue.h" +#include "AtomicValue.h" +#include "UtilAll.h" +#include "json/json.h" + + +namespace rmq +{ + class OffsetSerializeWrapper : public RemotingSerializable + { + public: + void encode(std::string& outData) + { + Json::Value offsetTable; + RMQ_FOR_EACH(m_offsetTable, it) + { + MessageQueue mq = it->first; + kpr::AtomicLong& offset = it->second; + + std::string mqStr = mq.toJsonString(); + offsetTable[mqStr] = offset.get(); + } + + Json::Value obj; + obj["offsetTable"] = offsetTable; + + Json::FastWriter writer; + outData = writer.write(obj); + } + static OffsetSerializeWrapper* decode(const char* pData, int len) + { + /* + { + "offsetTable":{ + '{"brokerName":"broker-a","queueId":3,"topic":"TopicTest"}':0, + '{"brokerName":"broker-a","queueId":2,"topic":"TopicTest"}':0 + } + + } + */ + + RMQ_DEBUG("decode, data:%s", pData); + + Json::Reader reader; + Json::Value obj; + if (!reader.parse(pData, pData + len, obj)) + { + return NULL; + } + + RMQ_DEBUG("decode ok"); + + if (obj.isObject()) + { + Json::Value objOffsetTable = obj["offsetTable"]; + if (objOffsetTable.isObject()) + { + std::map<MessageQueue, kpr::AtomicLong> offsetTable; + OffsetSerializeWrapper* offsetWrapper = new OffsetSerializeWrapper(); + + Json::Value::Members members = objOffsetTable.getMemberNames(); + for (typeof(members.begin()) it = members.begin(); it != members.end(); it++) + { + std::string key = *it; + Json::Value objMq; + RMQ_DEBUG("decode, key:%s", key.c_str()); + if (!reader.parse(key, objMq)) + { + continue; + } + RMQ_DEBUG("decode, key ok"); + + MessageQueue mq(objMq["topic"].asString(), objMq["brokerName"].asString(), + objMq["queueId"].asInt()); + long long offset = objOffsetTable[key].asInt64(); + + offsetTable[mq] = kpr::AtomicLong(offset); + } + offsetWrapper->setOffsetTable(offsetTable); + + return offsetWrapper; + } + } + + return NULL; + } + + std::string toString() const + { + std::stringstream ss; + ss << "{offsetTable=" << UtilAll::toString(m_offsetTable) + << "}"; + return ss.str(); + } + + std::map<MessageQueue, kpr::AtomicLong>& getOffsetTable() + { + return m_offsetTable; + } + + void setOffsetTable(const std::map<MessageQueue, kpr::AtomicLong>& table) + { + m_offsetTable = table; + } + + private: + std::map<MessageQueue, kpr::AtomicLong> m_offsetTable; + }; + + typedef kpr::RefHandleT<OffsetSerializeWrapper> OffsetSerializeWrapperPtr; +} + +#endif
