http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/6a45c767/rocketmq-client4cpp/include/MQClientException.h ---------------------------------------------------------------------- diff --git a/rocketmq-client4cpp/include/MQClientException.h b/rocketmq-client4cpp/include/MQClientException.h new file mode 100755 index 0000000..f1d1d04 --- /dev/null +++ b/rocketmq-client4cpp/include/MQClientException.h @@ -0,0 +1,105 @@ +/** + * Copyright (C) 2013 kangliqiang ,[email protected] + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#ifndef __RMQ_MQCLIENTEXCEPTION_H__ +#define __RMQ_MQCLIENTEXCEPTION_H__ + +#include <string> +#include <ostream> +#include <sstream> +#include <exception> + +#include "RocketMQClient.h" + +namespace rmq +{ + class MQException : public std::exception + { + public: + MQException(const std::string& msg, int error,const char* file,int line)throw() + : m_error(error),m_line(line),m_file(file) + { + try + { + std::stringstream ss; + ss << "[" << file << ":" << line <<"]|error: " << error << "|msg:" << msg; + m_msg = ss.str(); + } + catch (...) + { + } + } + + virtual ~MQException()throw() + { + } + + const char* what() const throw() + { + return m_msg.c_str(); + } + + int GetError() const throw() + { + return m_error; + } + + virtual const char* GetType() const throw() + { + return "MQException"; + } + + protected: + int m_error; + int m_line; + std::string m_msg; + std::string m_file; + }; + + inline std::ostream& operator<<(std::ostream& os, const MQException& e) + { + os <<"Type:"<<e.GetType() << e.what(); + return os; + } + + #define DEFINE_MQCLIENTEXCEPTION(name, parent) \ + class name : public parent \ + {\ + public:\ + name(const std::string& msg, int error,const char* file,int line) throw ()\ + : parent(msg, error, file, line) {}\ + virtual const char* GetType() const throw()\ + {\ + return #name;\ + }\ + }; + + DEFINE_MQCLIENTEXCEPTION(MQClientException, MQException) + DEFINE_MQCLIENTEXCEPTION(MQBrokerException, MQException) + DEFINE_MQCLIENTEXCEPTION(InterruptedException, MQException) + DEFINE_MQCLIENTEXCEPTION(UnknownHostException, MQException) + + DEFINE_MQCLIENTEXCEPTION(RemotingException, MQException) + DEFINE_MQCLIENTEXCEPTION(RemotingCommandException, RemotingException) + DEFINE_MQCLIENTEXCEPTION(RemotingConnectException, RemotingException) + DEFINE_MQCLIENTEXCEPTION(RemotingSendRequestException, RemotingException) + DEFINE_MQCLIENTEXCEPTION(RemotingTimeoutException, RemotingException) + DEFINE_MQCLIENTEXCEPTION(RemotingTooMuchRequestException, RemotingException) + + #define THROW_MQEXCEPTION(e,msg,err) throw e(msg,err,__FILE__,__LINE__) +} + +#endif
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/6a45c767/rocketmq-client4cpp/include/MQConsumer.h ---------------------------------------------------------------------- diff --git a/rocketmq-client4cpp/include/MQConsumer.h b/rocketmq-client4cpp/include/MQConsumer.h new file mode 100755 index 0000000..87efe97 --- /dev/null +++ b/rocketmq-client4cpp/include/MQConsumer.h @@ -0,0 +1,48 @@ +/** +* Copyright (C) 2013 kangliqiang ,[email protected] +* +* Licensed under the Apache License, Version 2.0 (the "License"); +* you may not use this file except in compliance with the License. +* You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +*/ + +#ifndef __RMQ_MQCONSUMER_H__ +#define __RMQ_MQCONSUMER_H__ + +#include <set> +#include <string> + +#include "RocketMQClient.h" +#include "MQAdmin.h" +#include "ConsumeType.h" + + +namespace rmq +{ + class MessageExt; + + /** + * Consumer interface + * + */ + class MQConsumer : public MQAdmin + { + public: + virtual ~MQConsumer(){} + + virtual void start()=0; + virtual void shutdown()=0; + + virtual void sendMessageBack(MessageExt& msg, int delayLevel)=0; + virtual std::set<MessageQueue>* fetchSubscribeMessageQueues(const std::string& topic)=0; + }; +} +#endif http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/6a45c767/rocketmq-client4cpp/include/MQProducer.h ---------------------------------------------------------------------- diff --git a/rocketmq-client4cpp/include/MQProducer.h b/rocketmq-client4cpp/include/MQProducer.h new file mode 100755 index 0000000..b353aba --- /dev/null +++ b/rocketmq-client4cpp/include/MQProducer.h @@ -0,0 +1,71 @@ +/** +* Copyright (C) 2013 kangliqiang ,[email protected] +* +* Licensed under the Apache License, Version 2.0 (the "License")=0; +* you may not use this file except in compliance with the License. +* You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +*/ + +#ifndef __RMQ_MQPRODUCER_H__ +#define __RMQ_MQPRODUCER_H__ + +#include <vector> +#include <string> + +#include "RocketMQClient.h" +#include "MQAdmin.h" +#include "SendResult.h" + +namespace rmq +{ + class MessageQueue; + class SendCallback; + class LocalTransactionExecuter; + class MessageQueueSelector; + + /** + * Producer interface + * + */ + class MQProducer : public MQAdmin + { + public: + MQProducer() + { + } + + virtual ~MQProducer() + { + } + + virtual void start()=0; + virtual void shutdown()=0; + + virtual std::vector<MessageQueue>* fetchPublishMessageQueues(const std::string& topic)=0; + + virtual SendResult send(Message& msg)=0; + virtual void send(Message& msg, SendCallback* sendCallback)=0; + virtual void sendOneway(Message& msg)=0; + + virtual SendResult send(Message& msg, MessageQueue& mq)=0; + virtual void send(Message& msg, MessageQueue& mq, SendCallback* sendCallback)=0; + virtual void sendOneway(Message& msg, MessageQueue& mq)=0; + + virtual SendResult send(Message& msg, MessageQueueSelector* selector, void* arg)=0; + virtual void send(Message& msg, MessageQueueSelector* selector, void* arg, SendCallback* sendCallback)=0; + virtual void sendOneway(Message& msg, MessageQueueSelector* selector, void* arg)=0; + + virtual TransactionSendResult sendMessageInTransaction(Message& msg, + LocalTransactionExecuter* tranExecuter, + void* arg)=0; + }; +} +#endif http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/6a45c767/rocketmq-client4cpp/include/MQPullConsumer.h ---------------------------------------------------------------------- diff --git a/rocketmq-client4cpp/include/MQPullConsumer.h b/rocketmq-client4cpp/include/MQPullConsumer.h new file mode 100755 index 0000000..ffb2ac5 --- /dev/null +++ b/rocketmq-client4cpp/include/MQPullConsumer.h @@ -0,0 +1,54 @@ +/** +* Copyright (C) 2013 kangliqiang ,[email protected] +* +* Licensed under the Apache License, Version 2.0 (the "License"); +* you may not use this file except in compliance with the License. +* You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +*/ +#ifndef __RMQ_MQPULLCONSUMER_H__ +#define __RMQ_MQPULLCONSUMER_H__ + +#include <set> +#include <string> + +#include "RocketMQClient.h" +#include "MQConsumer.h" +#include "PullResult.h" + +namespace rmq +{ + class MessageQueueListener; + class MessageQueue; + class PullCallback; + + /** + * Pull Consumer + * + */ + class MQPullConsumer : public MQConsumer + { + public: + virtual ~MQPullConsumer(){} + virtual void registerMessageQueueListener(const std::string& topic, MessageQueueListener* pListener)=0; + + virtual PullResult* pull(MessageQueue& mq, const std::string& subExpression, long long offset,int maxNums)=0; + virtual void pull(MessageQueue& mq, const std::string& subExpression, long long offset, int maxNums, PullCallback* pPullCallback)=0; + + virtual PullResult* pullBlockIfNotFound(MessageQueue& mq, const std::string& subExpression, long long offset, int maxNums)=0; + virtual void pullBlockIfNotFound(MessageQueue& mq, const std::string& subExpression, long long offset, int maxNums, PullCallback* pPullCallback)=0; + + virtual void updateConsumeOffset(MessageQueue& mq, long long offset)=0; + virtual long long fetchConsumeOffset(MessageQueue& mq, bool fromStore)=0; + + virtual std::set<MessageQueue>* fetchMessageQueuesInBalance(const std::string& topic)=0; + }; +} +#endif http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/6a45c767/rocketmq-client4cpp/include/MQPushConsumer.h ---------------------------------------------------------------------- diff --git a/rocketmq-client4cpp/include/MQPushConsumer.h b/rocketmq-client4cpp/include/MQPushConsumer.h new file mode 100755 index 0000000..fe6d4a0 --- /dev/null +++ b/rocketmq-client4cpp/include/MQPushConsumer.h @@ -0,0 +1,49 @@ +/** +* Copyright (C) 2013 kangliqiang ,[email protected] +* +* Licensed under the Apache License, Version 2.0 (the "License"); +* you may not use this file except in compliance with the License. +* You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +*/ +#ifndef __RMQ_MQPUSHCONSUMER_H__ +#define __RMQ_MQPUSHCONSUMER_H__ + +#include <set> +#include <string> + +#include "RocketMQClient.h" +#include "MQConsumer.h" +#include "PullResult.h" + +namespace rmq +{ + class MessageListener; + + /** + * Push Consumer + * + */ + class MQPushConsumer : public MQConsumer + { + public: + virtual void registerMessageListener(MessageListener* pMessageListener)=0; + + + virtual void subscribe(const std::string& topic, const std::string& subExpression)=0; + virtual void unsubscribe(const std::string& topic)=0; + + + virtual void updateCorePoolSize(int corePoolSize)=0; + virtual void suspend()=0; + virtual void resume()=0; + }; +} +#endif http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/6a45c767/rocketmq-client4cpp/include/Message.h ---------------------------------------------------------------------- diff --git a/rocketmq-client4cpp/include/Message.h b/rocketmq-client4cpp/include/Message.h new file mode 100755 index 0000000..441b4e5 --- /dev/null +++ b/rocketmq-client4cpp/include/Message.h @@ -0,0 +1,136 @@ +/** +* Copyright (C) 2013 kangliqiang ,[email protected] +* +* Licensed under the Apache License, Version 2.0 (the "License"); +* you may not use this file except in compliance with the License. +* You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +*/ + +#ifndef __RMQ_MESSAGE_H__ +#define __RMQ_MESSAGE_H__ + +#include <map> +#include <string> +#include <list> +#include "RocketMQClient.h" + +namespace rmq +{ + /** + * Message + * + */ + class Message + { + public: + Message(); + Message(const std::string& topic, const char* body,int len); + Message(const std::string& topic, const std::string& tags, const char* body,int len); + Message(const std::string& topic, const std::string& tags,const std::string& keys, const char* body,int len); + Message(const std::string& topic, + const std::string& tags, + const std::string& keys, + const int flag, + const char* body, + int len, + bool waitStoreMsgOK); + + virtual ~Message(); + Message(const Message& other); + Message& operator=(const Message& other); + + void clearProperty(const std::string& name); + void putProperty(const std::string& name, const std::string& value); + std::string getProperty(const std::string& name); + + std::string getTopic()const; + void setTopic(const std::string& topic); + + std::string getTags(); + void setTags(const std::string& tags); + + std::string getKeys(); + void setKeys(const std::string& keys); + void setKeys(const std::list<std::string> keys); + + int getDelayTimeLevel(); + void setDelayTimeLevel(int level); + + bool isWaitStoreMsgOK(); + void setWaitStoreMsgOK(bool waitStoreMsgOK); + + int getFlag(); + void setFlag(int flag); + + const char* getBody() const; + int getBodyLen() const; + void setBody(const char* body, int len); + + bool tryToCompress(int compressLevel); + const char* getCompressBody() const; + int getCompressBodyLen() const; + + std::map<std::string, std::string>& getProperties(); + void setProperties(const std::map<std::string, std::string>& properties); + + std::string toString() const; + + protected: + void Init(const std::string& topic, + const std::string& tags, + const std::string& keys, + const int flag, + const char* body, + int len, + bool waitStoreMsgOK); + + public: + static const std::string PROPERTY_KEYS; + static const std::string PROPERTY_TAGS; + static const std::string PROPERTY_WAIT_STORE_MSG_OK; + static const std::string PROPERTY_DELAY_TIME_LEVEL; + + /** + * for inner use + */ + static const std::string PROPERTY_RETRY_TOPIC; + static const std::string PROPERTY_REAL_TOPIC; + static const std::string PROPERTY_REAL_QUEUE_ID; + static const std::string PROPERTY_TRANSACTION_PREPARED; + static const std::string PROPERTY_PRODUCER_GROUP; + static const std::string PROPERTY_MIN_OFFSET; + static const std::string PROPERTY_MAX_OFFSET; + static const std::string PROPERTY_BUYER_ID; + static const std::string PROPERTY_ORIGIN_MESSAGE_ID; + static const std::string PROPERTY_TRANSFER_FLAG; + static const std::string PROPERTY_CORRECTION_FLAG; + static const std::string PROPERTY_MQ2_FLAG; + static const std::string PROPERTY_RECONSUME_TIME; + static const std::string PROPERTY_MSG_REGION; + static const std::string PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX; + static const std::string PROPERTY_MAX_RECONSUME_TIMES; + static const std::string PROPERTY_CONSUME_START_TIMESTAMP; + + static const std::string KEY_SEPARATOR; + private: + std::string m_topic; + int m_flag; + std::map<std::string, std::string> m_properties; + + char* m_body; + int m_bodyLen; + + char* m_compressBody; + int m_compressBodyLen; + }; +} + +#endif http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/6a45c767/rocketmq-client4cpp/include/MessageExt.h ---------------------------------------------------------------------- diff --git a/rocketmq-client4cpp/include/MessageExt.h b/rocketmq-client4cpp/include/MessageExt.h new file mode 100755 index 0000000..f70041c --- /dev/null +++ b/rocketmq-client4cpp/include/MessageExt.h @@ -0,0 +1,108 @@ +/** +* Copyright (C) 2013 kangliqiang ,[email protected] +* +* Licensed under the Apache License, Version 2.0 (the "License"); +* you may not use this file except in compliance with the License. +* You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +*/ +#ifndef __RMQ_MESSAGEEXT_H__ +#define __RMQ_MESSAGEEXT_H__ + +#include <sys/socket.h> +#include <string> +#include "Message.h" +#include "TopicFilterType.h" +#include "RocketMQClient.h" + +namespace rmq + { + /** + * Message extend + * + */ + class MessageExt : public Message + { + public: + MessageExt(); + + MessageExt(int queueId, + long long bornTimestamp, + sockaddr bornHost, + long long storeTimestamp, + sockaddr storeHost, + std::string msgId); + + ~MessageExt(); + + static TopicFilterType parseTopicFilterType(int sysFlag); + + int getQueueId(); + void setQueueId(int queueId); + + long long getBornTimestamp(); + void setBornTimestamp(long long bornTimestamp); + + sockaddr getBornHost(); + std::string getBornHostString(); + std::string getBornHostNameString(); + void setBornHost(const sockaddr& bornHost); + + long long getStoreTimestamp(); + void setStoreTimestamp(long long storeTimestamp); + + sockaddr getStoreHost(); + std::string getStoreHostString(); + void setStoreHost(const sockaddr& storeHost); + + std::string getMsgId(); + void setMsgId(const std::string& msgId); + + int getSysFlag(); + void setSysFlag(int sysFlag); + + int getBodyCRC(); + void setBodyCRC(int bodyCRC); + + long long getQueueOffset(); + void setQueueOffset(long long queueOffset); + + long long getCommitLogOffset(); + void setCommitLogOffset(long long physicOffset); + + int getStoreSize(); + void setStoreSize(int storeSize); + + int getReconsumeTimes(); + void setReconsumeTimes(int reconsumeTimes); + + long long getPreparedTransactionOffset(); + void setPreparedTransactionOffset(long long preparedTransactionOffset); + + std::string toString() const; + + private: + long long m_queueOffset; + long long m_commitLogOffset; + long long m_bornTimestamp; + long long m_storeTimestamp; + long long m_preparedTransactionOffset; + int m_queueId; + int m_storeSize; + int m_sysFlag; + int m_bodyCRC; + int m_reconsumeTimes; + sockaddr m_bornHost; + sockaddr m_storeHost; + std::string m_msgId; + }; +} + +#endif http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/6a45c767/rocketmq-client4cpp/include/MessageListener.h ---------------------------------------------------------------------- diff --git a/rocketmq-client4cpp/include/MessageListener.h b/rocketmq-client4cpp/include/MessageListener.h new file mode 100755 index 0000000..130a219 --- /dev/null +++ b/rocketmq-client4cpp/include/MessageListener.h @@ -0,0 +1,94 @@ +/** +* Copyright (C) 2013 kangliqiang ,[email protected] +* +* Licensed under the Apache License, Version 2.0 (the "License"); +* you may not use this file except in compliance with the License. +* You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +*/ + +#ifndef __RMQ_MESSAGELISTENER_H__ +#define __RMQ_MESSAGELISTENER_H__ + +#include <limits.h> +#include <list> + +#include "MessageExt.h" +#include "MessageQueue.h" + +namespace rmq +{ + /** + * Message Listener + * + */ + class MessageListener + { + public: + virtual ~MessageListener(){} + }; + + enum ConsumeOrderlyStatus + { + SUCCESS, + ROLLBACK, + COMMIT, + SUSPEND_CURRENT_QUEUE_A_MOMENT, + }; + + typedef struct tagConsumeOrderlyContext + { + tagConsumeOrderlyContext(MessageQueue& mq) + :messageQueue(mq), + autoCommit(true), + suspendCurrentQueueTimeMillis(1000) + { + } + + MessageQueue messageQueue;///< Ҫ���ѵ���Ϣ�����ĸ����� + bool autoCommit;///< ��ϢOffset�Ƿ��Զ��ύ + long suspendCurrentQueueTimeMillis; + }ConsumeOrderlyContext; + + class MessageListenerOrderly : public MessageListener + { + public: + virtual ConsumeOrderlyStatus consumeMessage(std::list<MessageExt*>& msgs, + ConsumeOrderlyContext& context)=0; + }; + + enum ConsumeConcurrentlyStatus + { + CONSUME_SUCCESS, + RECONSUME_LATER, + }; + + struct ConsumeConcurrentlyContext + { + ConsumeConcurrentlyContext(MessageQueue& mq) + :messageQueue(mq), + delayLevelWhenNextConsume(0), + ackIndex(INT_MAX) + { + } + MessageQueue messageQueue; + int delayLevelWhenNextConsume; + int ackIndex; + }; + + class MessageListenerConcurrently : public MessageListener + { + public: + virtual ConsumeConcurrentlyStatus consumeMessage(std::list<MessageExt*>& msgs, + ConsumeConcurrentlyContext& context)=0; + }; +} + +#endif http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/6a45c767/rocketmq-client4cpp/include/MessageQueue.h ---------------------------------------------------------------------- diff --git a/rocketmq-client4cpp/include/MessageQueue.h b/rocketmq-client4cpp/include/MessageQueue.h new file mode 100755 index 0000000..89ddf58 --- /dev/null +++ b/rocketmq-client4cpp/include/MessageQueue.h @@ -0,0 +1,70 @@ +/** +* Copyright (C) 2013 kangliqiang ,[email protected] +* +* Licensed under the Apache License, Version 2.0 (the "License"); +* you may not use this file except in compliance with the License. +* You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +*/ + +#ifndef __RMQ_MESSAGEQUEUE_H__ +#define __RMQ_MESSAGEQUEUE_H__ + +#include <iostream> +#include <string> +#include <sstream> + +#include "RocketMQClient.h" + +namespace rmq +{ + /** + * Message Queue + * + */ + class MessageQueue + { + public: + MessageQueue(); + ~MessageQueue(){}; + + MessageQueue(const std::string& topic, const std::string& brokerName, int queueId); + + std::string getTopic()const; + void setTopic(const std::string& topic); + + std::string getBrokerName()const; + void setBrokerName(const std::string& brokerName); + + int getQueueId()const; + void setQueueId(int queueId); + + int hashCode(); + std::string toString() const; + std::string toJsonString() const; + + bool operator==(const MessageQueue& mq) const; + bool operator<(const MessageQueue& mq) const; + int compareTo(const MessageQueue& mq) const; + + private: + std::string m_topic; + std::string m_brokerName; + int m_queueId; + }; + + inline std::ostream& operator<<(std::ostream& os, const MessageQueue& obj) + { + os << obj.toString(); + return os; + } +} + +#endif http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/6a45c767/rocketmq-client4cpp/include/MessageQueueListener.h ---------------------------------------------------------------------- diff --git a/rocketmq-client4cpp/include/MessageQueueListener.h b/rocketmq-client4cpp/include/MessageQueueListener.h new file mode 100755 index 0000000..9f04c3e --- /dev/null +++ b/rocketmq-client4cpp/include/MessageQueueListener.h @@ -0,0 +1,38 @@ +/** + * Copyright (C) 2013 kangliqiang ,[email protected] + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#ifndef __RMQ_MESSAGEQUEUELISTENER_H__ +#define __RMQ_MESSAGEQUEUELISTENER_H__ + +#include <set> +#include "RocketMQClient.h" + +namespace rmq +{ + /** + * Message Queue Listener + * + */ + class MessageQueueListener + { + public: + virtual ~MessageQueueListener() {} + virtual void messageQueueChanged(const std::string& topic, + std::set<MessageQueue>& mqAll, + std::set<MessageQueue>& mqDivided)=0; + }; +} + +#endif http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/6a45c767/rocketmq-client4cpp/include/OffsetStore.h ---------------------------------------------------------------------- diff --git a/rocketmq-client4cpp/include/OffsetStore.h b/rocketmq-client4cpp/include/OffsetStore.h new file mode 100755 index 0000000..a533750 --- /dev/null +++ b/rocketmq-client4cpp/include/OffsetStore.h @@ -0,0 +1,58 @@ +/** +* Copyright (C) 2013 kangliqiang ,[email protected] +* +* Licensed under the Apache License, Version 2.0 (the "License"); +* you may not use this file except in compliance with the License. +* You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +*/ +#ifndef __RMQ_OFFSETSTORE_H__ +#define __RMQ_OFFSETSTORE_H__ + +#include <set> +#include <map> + +#include "RocketMQClient.h" + +namespace rmq +{ + class MessageQueue; + + enum ReadOffsetType + { + READ_FROM_MEMORY, + READ_FROM_STORE, + MEMORY_FIRST_THEN_STORE, + }; + + /** + * Consumer Offset Store + * + */ + class OffsetStore + { + public: + virtual ~OffsetStore() {} + + virtual void load()=0; + + virtual void updateOffset(const MessageQueue& mq, long long offset, bool increaseOnly)=0; + virtual long long readOffset(const MessageQueue& mq, ReadOffsetType type)=0; + + virtual void persistAll(std::set<MessageQueue>& mqs)=0; + virtual void persist(const MessageQueue& mq)=0; + + virtual void removeOffset(const MessageQueue& mq)=0; + + virtual std::map<MessageQueue, long long> cloneOffsetTable(const std::string& topic) = 0; + }; +} + +#endif http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/6a45c767/rocketmq-client4cpp/include/PullCallback.h ---------------------------------------------------------------------- diff --git a/rocketmq-client4cpp/include/PullCallback.h b/rocketmq-client4cpp/include/PullCallback.h new file mode 100755 index 0000000..47ade68 --- /dev/null +++ b/rocketmq-client4cpp/include/PullCallback.h @@ -0,0 +1,39 @@ +/** + * Copyright (C) 2013 kangliqiang ,[email protected] + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#ifndef __RMQ_PULLCALLBACK_H__ +#define __RMQ_PULLCALLBACK_H__ + +#include "RocketMQClient.h" +#include "PullResult.h" + +namespace rmq +{ + class MQException; + + /** + * PullCallback + * + */ + class PullCallback + { + public: + virtual ~PullCallback() {} + virtual void onSuccess(PullResult& pullResult)=0; + virtual void onException(MQException& e)=0; + }; +} + +#endif http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/6a45c767/rocketmq-client4cpp/include/PullResult.h ---------------------------------------------------------------------- diff --git a/rocketmq-client4cpp/include/PullResult.h b/rocketmq-client4cpp/include/PullResult.h new file mode 100755 index 0000000..42c13ca --- /dev/null +++ b/rocketmq-client4cpp/include/PullResult.h @@ -0,0 +1,91 @@ +/** +* Copyright (C) 2013 kangliqiang ,[email protected] +* +* Licensed under the Apache License, Version 2.0 (the "License"); +* you may not use this file except in compliance with the License. +* You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +*/ +#ifndef __RMQ_PULLRESULT_H__ +#define __RMQ_PULLRESULT_H__ + +#include <list> +#include <string> +#include <sstream> + +#include "RocketMQClient.h" +#include "MessageExt.h" + +namespace rmq +{ + enum PullStatus + { + FOUND, + NO_NEW_MSG, + NO_MATCHED_MSG, + OFFSET_ILLEGAL + }; + + /** + * PullResult + * + */ + struct PullResult + { + PullResult() + { + + } + + PullResult(PullStatus pullStatus, + long long nextBeginOffset, + long long minOffset, + long long maxOffset, + std::list<MessageExt*>& msgFoundList) + :pullStatus(pullStatus), + nextBeginOffset(nextBeginOffset), + minOffset(minOffset), + maxOffset(maxOffset), + msgFoundList(msgFoundList) + { + + } + + ~PullResult() + { + std::list<MessageExt*>::iterator it = msgFoundList.begin(); + + for (;it!=msgFoundList.end();it++) + { + delete *it; + } + } + + std::string toString() const + { + std::stringstream ss; + ss << "{pullStatus=" << pullStatus + << ",nextBeginOffset=" << nextBeginOffset + << ",minOffset=" << nextBeginOffset + << ",maxOffset=" << nextBeginOffset + << ",msgFoundList.size=" << msgFoundList.size() + <<"}"; + return ss.str(); + } + + PullStatus pullStatus; + long long nextBeginOffset; + long long minOffset; + long long maxOffset; + std::list<MessageExt*> msgFoundList; + }; +} + +#endif http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/6a45c767/rocketmq-client4cpp/include/QueryResult.h ---------------------------------------------------------------------- diff --git a/rocketmq-client4cpp/include/QueryResult.h b/rocketmq-client4cpp/include/QueryResult.h new file mode 100644 index 0000000..13164e4 --- /dev/null +++ b/rocketmq-client4cpp/include/QueryResult.h @@ -0,0 +1,56 @@ +/** +* Copyright (C) 2013 kangliqiang ,[email protected] +* +* Licensed under the Apache License, Version 2.0 (the "License"); +* you may not use this file except in compliance with the License. +* You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +*/ + +#ifndef __RMQ_QUERYRESULT_H__ +#define __RMQ_QUERYRESULT_H__ + +#include <list> + +#include "RocketMQClient.h" +#include "MessageExt.h" + +namespace rmq +{ + /** + * QueryResult + * + */ + class QueryResult + { + public: + QueryResult(long long indexLastUpdateTimestamp, const std::list<MessageExt*>& messageList) + { + m_indexLastUpdateTimestamp = indexLastUpdateTimestamp; + m_messageList = messageList; + } + + long long getIndexLastUpdateTimestamp() + { + return m_indexLastUpdateTimestamp; + } + + std::list<MessageExt*>& getMessageList() + { + return m_messageList; + } + + private: + long long m_indexLastUpdateTimestamp; + std::list<MessageExt*> m_messageList; + }; +} + +#endif http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/6a45c767/rocketmq-client4cpp/include/RocketMQClient.h ---------------------------------------------------------------------- diff --git a/rocketmq-client4cpp/include/RocketMQClient.h b/rocketmq-client4cpp/include/RocketMQClient.h new file mode 100755 index 0000000..e4c71c9 --- /dev/null +++ b/rocketmq-client4cpp/include/RocketMQClient.h @@ -0,0 +1,100 @@ +/** +* Copyright (C) 2013 kangliqiang ,[email protected] +* +* Licensed under the Apache License, Version 2.0 (the "License"); +* you may not use this file except in compliance with the License. +* You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +*/ +#ifndef __RMQ_ROCKETMQCLIENT_H__ +#define __RMQ_ROCKETMQCLIENT_H__ + +#include <stdlib.h> +#include <stdio.h> +#include <stdint.h> +#include <string.h> +#include <assert.h> +#include <time.h> +#include <stdarg.h> +#include <fcntl.h> +#include <errno.h> +#include <signal.h> +#include <pthread.h> + +#include <sys/time.h> +#include <sys/timeb.h> +#include <sys/types.h> +#include <sys/stat.h> +#include <sys/file.h> +#include <sys/syscall.h> +#include <linux/unistd.h> + +#include <cstdio> +#include <iostream> +#include <string> +#include <sstream> +#include <vector> +#include <map> +#include <set> + + +class RocketMQUtil +{ +public: + enum + { + NONE_LOG = 0, + ERROR_LOG = 1, + WARN_LOG = 2, + INFO_LOG = 3, + DEBUG_LOG = 4, + }; + +public: + static pid_t getPid(); + static pid_t getTid(); + + static int getDiffDays(time_t tmFirst, time_t tmSecond); + static std::string tm2str(const time_t &t, const std::string &sFormat); + static std::string now2str(const std::string &sFormat); + static std::string now2str(); + static int64_t getNowMs(); + static std::string str2fmt(const char* format, ...)__attribute__((format(__printf__,1,2))); + + static int initLog(const std::string& sLogPath); + static void setLogLevel(int logLevel); + static void writeLog(const char* fmt, ...) __attribute__((format(__printf__,1,2))); + static inline bool isNeedLog(int level) + { + return (level <= _logLevel); + }; + +public: + static volatile int _logFd; + static int _logLevel; + static std::string _logPath; +}; + +#define RMQ_AUTO(name, value) typeof(value) name = value +#define RMQ_FOR_EACH(container, it) \ + for(typeof((container).begin()) it = (container).begin();it!=(container).end(); ++it) + + + +#define RMQ_DEBUG(fmt, args...) do{ if(RocketMQUtil::isNeedLog(RocketMQUtil::DEBUG_LOG)) RocketMQUtil::writeLog("%d-%d|[%s][%s:%s:%d][DEBUG]|"fmt"\n", RocketMQUtil::getPid(), RocketMQUtil::getTid(), RocketMQUtil::now2str().c_str(), __FILE__, __func__,__LINE__, ##args);}while(0) +#define RMQ_INFO(fmt, args...) do{ if(RocketMQUtil::isNeedLog(RocketMQUtil::INFO_LOG)) RocketMQUtil::writeLog("%d-%d|[%s][%s:%s:%d][INFO]|"fmt"\n", RocketMQUtil::getPid(), RocketMQUtil::getTid(), RocketMQUtil::now2str().c_str(), __FILE__, __func__, __LINE__, ##args);}while(0) +#define RMQ_WARN(fmt, args...) do{ if(RocketMQUtil::isNeedLog(RocketMQUtil::WARN_LOG)) RocketMQUtil::writeLog("%d-%d|[%s][%s:%s:%d][WARN]|"fmt"\n", RocketMQUtil::getPid(), RocketMQUtil::getTid(), RocketMQUtil::now2str().c_str(), __FILE__, __func__, __LINE__, ##args);}while(0) +#define RMQ_ERROR(fmt, args...) do{ if(RocketMQUtil::isNeedLog(RocketMQUtil::ERROR_LOG)) RocketMQUtil::writeLog("%d-%d|[%s][%s:%s:%d][ERROR]|"fmt"\n", RocketMQUtil::getPid(), RocketMQUtil::getTid(), RocketMQUtil::now2str().c_str(), __FILE__, __func__, __LINE__, ##args);}while(0) + +#define RMQ_PRINT(fmt, args...) do{ printf("%d|[%s][%s:%s:%d][DEBUG]|"fmt"\n", RocketMQUtil::getTid(), RocketMQUtil::now2str().c_str(), __FILE__, __func__,__LINE__, ##args);}while(0) + + +#endif + http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/6a45c767/rocketmq-client4cpp/include/SendCallback.h ---------------------------------------------------------------------- diff --git a/rocketmq-client4cpp/include/SendCallback.h b/rocketmq-client4cpp/include/SendCallback.h new file mode 100755 index 0000000..0feb5a1 --- /dev/null +++ b/rocketmq-client4cpp/include/SendCallback.h @@ -0,0 +1,39 @@ +/** +* Copyright (C) 2013 kangliqiang ,[email protected] +* +* Licensed under the Apache License, Version 2.0 (the "License"); +* you may not use this file except in compliance with the License. +* You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +*/ +#ifndef __RMQ_SENDCALLBACK_H__ +#define __RMQ_SENDCALLBACK_H__ + +#include "SendResult.h" +#include "RocketMQClient.h" + +namespace rmq +{ + class MQException; + + /** + * Send Mesage Callback + * + */ + class SendCallback + { + public: + virtual ~SendCallback() {} + virtual void onSuccess(SendResult& sendResult)=0; + virtual void onException(MQException& e)=0; + }; +} + +#endif http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/6a45c767/rocketmq-client4cpp/include/SendMessageHook.h ---------------------------------------------------------------------- diff --git a/rocketmq-client4cpp/include/SendMessageHook.h b/rocketmq-client4cpp/include/SendMessageHook.h new file mode 100644 index 0000000..9869aa6 --- /dev/null +++ b/rocketmq-client4cpp/include/SendMessageHook.h @@ -0,0 +1,50 @@ +/** +* Copyright (C) 2013 kangliqiang ,[email protected] +* +* Licensed under the Apache License, Version 2.0 (the "License"); +* you may not use this file except in compliance with the License. +* You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +*/ +#ifndef __RMQ_SENDMESSAGEHOOK_H__ +#define __RMQ_SENDMESSAGEHOOK_H__ + +#include <string> + +#include "RocketMQClient.h" +#include "Message.h" +#include "MQClientException.h" + +namespace rmq +{ + class SendMessageContext + { + public: + std::string producerGroup; + Message msg; + MessageQueue mq; + std::string brokerAddr; + CommunicationMode communicationMode; + SendResult sendResult; + MQException* pException; + void* pArg; + }; + + class SendMessageHook + { + public: + virtual ~SendMessageHook() {} + virtual std::string hookName()=0; + virtual void sendMessageBefore(const SendMessageContext& context)=0; + virtual void sendMessageAfter(const SendMessageContext& context)=0; + }; +} + +#endif http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/6a45c767/rocketmq-client4cpp/include/SendResult.h ---------------------------------------------------------------------- diff --git a/rocketmq-client4cpp/include/SendResult.h b/rocketmq-client4cpp/include/SendResult.h new file mode 100755 index 0000000..d6a3174 --- /dev/null +++ b/rocketmq-client4cpp/include/SendResult.h @@ -0,0 +1,89 @@ +/** +* Copyright (C) 2013 kangliqiang ,[email protected] +* +* Licensed under the Apache License, Version 2.0 (the "License"); +* you may not use this file except in compliance with the License. +* You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +*/ +#ifndef __RMQ_SENDRESULT_H__ +#define __RMQ_SENDRESULT_H__ + +#include "RocketMQClient.h" +#include "MessageQueue.h" + +namespace rmq +{ + enum SendStatus + { + SEND_OK, + FLUSH_DISK_TIMEOUT, + FLUSH_SLAVE_TIMEOUT, + SLAVE_NOT_AVAILABLE + }; + + /** + * Send Message Result + * + */ + class SendResult + { + public: + SendResult(); + SendResult(const SendStatus& sendStatus, + const std::string& msgId, + MessageQueue& messageQueue, + long long queueOffset, + std::string& projectGroupPrefix); + + const std::string& getMsgId(); + void setMsgId(const std::string& msgId); + SendStatus getSendStatus(); + void setSendStatus(const SendStatus& sendStatus); + MessageQueue& getMessageQueue(); + void setMessageQueue(MessageQueue& messageQueue); + long long getQueueOffset(); + void setQueueOffset(long long queueOffset); + bool hasResult(); + + std::string toString() const; + std::string toJsonString() const; + + private: + SendStatus m_sendStatus; + std::string m_msgId; + MessageQueue m_messageQueue; + long long m_queueOffset; + }; + + enum LocalTransactionState + { + COMMIT_MESSAGE, + ROLLBACK_MESSAGE, + UNKNOW, + }; + + /** + * Send transaction message result + * + */ + class TransactionSendResult : public SendResult + { + public: + TransactionSendResult(); + LocalTransactionState getLocalTransactionState(); + void setLocalTransactionState(LocalTransactionState localTransactionState); + + private: + LocalTransactionState m_localTransactionState; + }; +} + +#endif http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/6a45c767/rocketmq-client4cpp/include/TopicFilterType.h ---------------------------------------------------------------------- diff --git a/rocketmq-client4cpp/include/TopicFilterType.h b/rocketmq-client4cpp/include/TopicFilterType.h new file mode 100755 index 0000000..e51ae20 --- /dev/null +++ b/rocketmq-client4cpp/include/TopicFilterType.h @@ -0,0 +1,32 @@ +/** + * Copyright (C) 2013 kangliqiang ,[email protected] + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#ifndef __RMQ_TOPICFILTERTYPE_H__ +#define __RMQ_TOPICFILTERTYPE_H__ + +namespace rmq +{ + /** + * Topic filter type + * + */ + enum TopicFilterType + { + SINGLE_TAG, + MULTI_TAG + }; +} + +#endif http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/6a45c767/rocketmq-client4cpp/rocketmq.mk ---------------------------------------------------------------------- diff --git a/rocketmq-client4cpp/rocketmq.mk b/rocketmq-client4cpp/rocketmq.mk new file mode 100644 index 0000000..eecc458 --- /dev/null +++ b/rocketmq-client4cpp/rocketmq.mk @@ -0,0 +1,6 @@ +ROCKETMQ_PATH := /data/libs/rocketmq + +INCLUDE += -I$(ROCKETMQ_PATH)/include +INCLUDE_32 += -I$(ROCKETMQ_PATH)/include -march=i686 +LIB_32 += -L$(ROCKETMQ_PATH)/lib32 -lrocketmq -lz -lrt -lpthread +LIB_64 += -L$(ROCKETMQ_PATH)/lib64 -lrocketmq -lz -lrt -lpthread http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/6a45c767/rocketmq-client4cpp/src/ClientConfig.cpp ---------------------------------------------------------------------- diff --git a/rocketmq-client4cpp/src/ClientConfig.cpp b/rocketmq-client4cpp/src/ClientConfig.cpp new file mode 100755 index 0000000..986d67d --- /dev/null +++ b/rocketmq-client4cpp/src/ClientConfig.cpp @@ -0,0 +1,168 @@ +/** + * Copyright (C) 2010-2013 kangliqiang, [email protected] + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#include <stdlib.h> +#include <sstream> + +#include "MQClientException.h" +#include "SocketUtil.h" +#include "ClientConfig.h" +#include "UtilAll.h" +#include "MixAll.h" + +namespace rmq +{ + +ClientConfig::ClientConfig() +{ + char* addr = getenv(MixAll::NAMESRV_ADDR_ENV.c_str()); + if (addr) + { + m_namesrvAddr = addr; + } + else + { + m_namesrvAddr = ""; + } + + m_clientIP = getLocalAddress(); + m_instanceName = "DEFAULT"; + m_clientCallbackExecutorThreads = UtilAll::availableProcessors(); + m_pollNameServerInterval = 1000 * 30; + m_heartbeatBrokerInterval = 1000 * 30; + m_persistConsumerOffsetInterval = 1000 * 5; +} + +ClientConfig::~ClientConfig() +{ +} + +std::string ClientConfig::buildMQClientId() +{ + return m_clientIP + "@" + m_instanceName; +} + +void ClientConfig::changeInstanceNameToPID() +{ + if (m_instanceName == "DEFAULT") + { + m_instanceName = UtilAll::toString(UtilAll::getPid()); + } +} + + +void ClientConfig::resetClientConfig(const ClientConfig& cc) +{ + m_namesrvAddr = cc.m_namesrvAddr; + m_clientIP = cc.m_clientIP; + m_instanceName = cc.m_instanceName; + m_clientCallbackExecutorThreads = cc.m_clientCallbackExecutorThreads; + m_pollNameServerInterval = cc.m_pollNameServerInterval; + m_heartbeatBrokerInterval = cc.m_heartbeatBrokerInterval; + m_persistConsumerOffsetInterval = cc.m_persistConsumerOffsetInterval; +} + +ClientConfig ClientConfig::cloneClientConfig() +{ + return *this; +} + +std::string ClientConfig::getNamesrvAddr() +{ + return m_namesrvAddr; +} + +void ClientConfig::setNamesrvAddr(const std::string& namesrvAddr) +{ + m_namesrvAddr = namesrvAddr; +} + +std::string ClientConfig::getClientIP() +{ + return m_clientIP; +} + +void ClientConfig::setClientIP(const std::string& clientIP) +{ + m_clientIP = clientIP; +} + +std::string ClientConfig::getInstanceName() +{ + return m_instanceName; +} + +void ClientConfig::setInstanceName(const std::string& instanceName) +{ + m_instanceName = instanceName; +} + +int ClientConfig::getClientCallbackExecutorThreads() +{ + return m_clientCallbackExecutorThreads; +} + +void ClientConfig::setClientCallbackExecutorThreads(int clientCallbackExecutorThreads) +{ + m_clientCallbackExecutorThreads = clientCallbackExecutorThreads; +} + +int ClientConfig::getPollNameServerInterval() +{ + return m_pollNameServerInterval; +} + +void ClientConfig::setPollNameServerInterval(int pollNameServerInterval) +{ + m_pollNameServerInterval = pollNameServerInterval; +} + +int ClientConfig::getHeartbeatBrokerInterval() +{ + return m_heartbeatBrokerInterval; +} + +void ClientConfig::setHeartbeatBrokerInterval(int heartbeatBrokerInterval) +{ + m_heartbeatBrokerInterval = heartbeatBrokerInterval; +} + +int ClientConfig:: getPersistConsumerOffsetInterval() +{ + return m_persistConsumerOffsetInterval; +} + +void ClientConfig::setPersistConsumerOffsetInterval(int persistConsumerOffsetInterval) +{ + m_persistConsumerOffsetInterval = persistConsumerOffsetInterval; +} + + +std::string ClientConfig::toString() const +{ + std::stringstream ss; + ss << "{namesrvAddr=" << m_namesrvAddr + << ",clientIP=" << m_clientIP + << ",instanceName=" << m_instanceName + << ",clientCallbackExecutorThreads=" << m_clientCallbackExecutorThreads + << ",pollNameServerInteval=" << m_pollNameServerInterval + << ",heartbeatBrokerInterval=" << m_heartbeatBrokerInterval + << ",persistConsumerOffsetInterval=" << m_persistConsumerOffsetInterval + <<"}"; + return ss.str(); +} + + +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/6a45c767/rocketmq-client4cpp/src/ClientRemotingProcessor.cpp ---------------------------------------------------------------------- diff --git a/rocketmq-client4cpp/src/ClientRemotingProcessor.cpp b/rocketmq-client4cpp/src/ClientRemotingProcessor.cpp new file mode 100755 index 0000000..ae88de5 --- /dev/null +++ b/rocketmq-client4cpp/src/ClientRemotingProcessor.cpp @@ -0,0 +1,154 @@ +/** +* Copyright (C) 2013 kangliqiang ,[email protected] +* +* Licensed under the Apache License, Version 2.0 (the "License"); +* you may not use this file except in compliance with the License. +* You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +*/ +#include "ClientRemotingProcessor.h" +#include "MQProtos.h" +#include "TcpTransport.h" +#include "RemotingCommand.h" +#include "MQClientFactory.h" +#include "CommandCustomHeader.h" +#include "ConsumerRunningInfo.h" + + + +namespace rmq +{ + +ClientRemotingProcessor::ClientRemotingProcessor(MQClientFactory* pMQClientFactory) + : m_pMQClientFactory(pMQClientFactory) +{ + +} + +RemotingCommand* ClientRemotingProcessor::processRequest(TcpTransport* pTts, RemotingCommand* pRequest) +{ + int code = pRequest->getCode(); + switch (code) + { + case CHECK_TRANSACTION_STATE_VALUE: + return checkTransactionState(pTts, pRequest); + case NOTIFY_CONSUMER_IDS_CHANGED_VALUE: + return notifyConsumerIdsChanged(pTts, pRequest); + case RESET_CONSUMER_CLIENT_OFFSET_VALUE: + return resetOffset(pTts, pRequest); + case GET_CONSUMER_STATUS_FROM_CLIENT_VALUE: + return getConsumeStatus(pTts, pRequest); + case GET_CONSUMER_RUNNING_INFO_VALUE: + return getConsumerRunningInfo(pTts, pRequest); + case CONSUME_MESSAGE_DIRECTLY_VALUE: + return consumeMessageDirectly(pTts, pRequest); + default: + break; + } + + return NULL; +} + +RemotingCommand* ClientRemotingProcessor::checkTransactionState(TcpTransport* pTts, RemotingCommand* pRequest) +{ + //TODO + return NULL; +} + +RemotingCommand* ClientRemotingProcessor::notifyConsumerIdsChanged(TcpTransport* pTts, RemotingCommand* pRequest) +{ + try + { + NotifyConsumerIdsChangedRequestHeader* extHeader = (NotifyConsumerIdsChangedRequestHeader*)pRequest->getCommandCustomHeader(); + RMQ_INFO("receive broker's notification[{%s}], the consumer group: {%s} changed, rebalance immediately", + pTts->getServerAddr().c_str(), + extHeader->consumerGroup.c_str()); + m_pMQClientFactory->rebalanceImmediately(); + } + catch (std::exception& e) + { + RMQ_ERROR("notifyConsumerIdsChanged exception: %s", e.what()); + } + + return NULL; +} + +RemotingCommand* ClientRemotingProcessor::resetOffset(TcpTransport* pTts, RemotingCommand* pRequest) +{ + //TODO + return NULL; +} + + +RemotingCommand* ClientRemotingProcessor::getConsumeStatus(TcpTransport* pTts, RemotingCommand* pRequest) +{ + //TODO + return NULL; +} + + +RemotingCommand* ClientRemotingProcessor::getConsumerRunningInfo(TcpTransport* pTts, RemotingCommand* pRequest) +{ + return NULL; + + /* + GetConsumerRunningInfoRequestHeader* requestHeader = (GetConsumerRunningInfoRequestHeader)pRequest->getCommandCustomHeader(); + RemotingCommand* pResponse = RemotingCommand::createResponseCommand(NULL); + + pResponse = RemotingCommand::createResponseCommand( + REQUEST_CODE_NOT_SUPPORTED_VALUE, "request type not supported", NULL); + pResponse->setOpaque(pCmd->getOpaque()); + + ConsumerRunningInfo* consumerRunningInfo = m_pMQClientFactory->consumerRunningInfo(requestHeader->consumerGroup); + if (NULL != consumerRunningInfo) { + response.setCode(ResponseCode.SUCCESS); + response.setBody(consumerRunningInfo.encode()); + } else { + response.setCode(ResponseCode.SYSTEM_ERROR); + response.setRemark(String.format("The Consumer Group <%s> not exist in this consumer", + requestHeader.getConsumerGroup())); + } + return pResponse; + + // java + final RemotingCommand response = RemotingCommand.createResponseCommand(null); + final GetConsumerRunningInfoRequestHeader requestHeader = + (GetConsumerRunningInfoRequestHeader) request + .decodeCommandCustomHeader(GetConsumerRunningInfoRequestHeader.class); + + ConsumerRunningInfo consumerRunningInfo = + this.mqClientFactory.consumerRunningInfo(requestHeader.getConsumerGroup()); + if (null != consumerRunningInfo) { + if (requestHeader.isJstackEnable()) { + String jstack = UtilAll.jstack(); + consumerRunningInfo.setJstack(jstack); + } + + response.setCode(ResponseCode.SUCCESS); + response.setBody(consumerRunningInfo.encode()); + } else { + response.setCode(ResponseCode.SYSTEM_ERROR); + response.setRemark(String.format("The Consumer Group <%s> not exist in this consumer", + requestHeader.getConsumerGroup())); + } + + return response; + */ +} + + +RemotingCommand* ClientRemotingProcessor::consumeMessageDirectly(TcpTransport* pTts, RemotingCommand* pRequest) +{ + //TODO + return NULL; +} + + +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/6a45c767/rocketmq-client4cpp/src/ClientRemotingProcessor.h ---------------------------------------------------------------------- diff --git a/rocketmq-client4cpp/src/ClientRemotingProcessor.h b/rocketmq-client4cpp/src/ClientRemotingProcessor.h new file mode 100755 index 0000000..4cd2873 --- /dev/null +++ b/rocketmq-client4cpp/src/ClientRemotingProcessor.h @@ -0,0 +1,45 @@ +/** +* Copyright (C) 2013 kangliqiang ,[email protected] +* +* Licensed under the Apache License, Version 2.0 (the "License"); +* you may not use this file except in compliance with the License. +* You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +*/ + +#ifndef __CLIENTREMOTINGPROCESSOR_H__ +#define __CLIENTREMOTINGPROCESSOR_H__ + +#include "TcpRequestProcessor.h" + +namespace rmq +{ + class MQClientFactory; + class RemotingCommand; + + class ClientRemotingProcessor : public TcpRequestProcessor + { + public: + ClientRemotingProcessor(MQClientFactory* pMQClientFactory); + + RemotingCommand* processRequest(TcpTransport* pTts, RemotingCommand* pRequest); + RemotingCommand* checkTransactionState(TcpTransport* pTts, RemotingCommand* pRequest); + RemotingCommand* notifyConsumerIdsChanged(TcpTransport* pTts, RemotingCommand* pRequest); + RemotingCommand* resetOffset(TcpTransport* pTts, RemotingCommand* pRequest); + RemotingCommand* getConsumeStatus(TcpTransport* pTts, RemotingCommand* pRequest); + RemotingCommand* getConsumerRunningInfo(TcpTransport* pTts, RemotingCommand* pRequest); + RemotingCommand* consumeMessageDirectly(TcpTransport* pTts, RemotingCommand* pRequest); + + private: + MQClientFactory* m_pMQClientFactory; + }; +} + +#endif http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/6a45c767/rocketmq-client4cpp/src/CommunicationMode.h ---------------------------------------------------------------------- diff --git a/rocketmq-client4cpp/src/CommunicationMode.h b/rocketmq-client4cpp/src/CommunicationMode.h new file mode 100755 index 0000000..43b2941 --- /dev/null +++ b/rocketmq-client4cpp/src/CommunicationMode.h @@ -0,0 +1,34 @@ +/** + * Copyright (C) 2013 kangliqiang ,[email protected] + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#ifndef __COMMUNICATIONMODE_H__ +#define __COMMUNICATIONMODE_H__ + +namespace rmq +{ + /** + * Communication Mode + * + */ + enum CommunicationMode + { + SYNC, + ASYNC, + ONEWAY + }; +} + +#endif http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/6a45c767/rocketmq-client4cpp/src/FindBrokerResult.h ---------------------------------------------------------------------- diff --git a/rocketmq-client4cpp/src/FindBrokerResult.h b/rocketmq-client4cpp/src/FindBrokerResult.h new file mode 100644 index 0000000..51a9845 --- /dev/null +++ b/rocketmq-client4cpp/src/FindBrokerResult.h @@ -0,0 +1,28 @@ +/** +* Copyright (C) 2013 kangliqiang ,[email protected] +* +* Licensed under the Apache License, Version 2.0 (the "License"); +* you may not use this file except in compliance with the License. +* You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +*/ +#ifndef __FINDBROKERRESULT_H__ +#define __FINDBROKERRESULT_H__ + +namespace rmq +{ + typedef struct + { + std::string brokerAddr; + bool slave; + } FindBrokerResult; +} + +#endif http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/6a45c767/rocketmq-client4cpp/src/MQAdminImpl.cpp ---------------------------------------------------------------------- diff --git a/rocketmq-client4cpp/src/MQAdminImpl.cpp b/rocketmq-client4cpp/src/MQAdminImpl.cpp new file mode 100755 index 0000000..2a6b597 --- /dev/null +++ b/rocketmq-client4cpp/src/MQAdminImpl.cpp @@ -0,0 +1,295 @@ +/** +* Copyright (C) 2013 kangliqiang ,[email protected] +* +* Licensed under the Apache License, Version 2.0 (the "License"); +* you may not use this file except in compliance with the License. +* You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +*/ + +#include <list> +#include "SocketUtil.h" +#include "MQAdminImpl.h" +#include "MQClientFactory.h" +#include "MQClientAPIImpl.h" +#include "MQClientException.h" +#include "TopicConfig.h" +#include "TopicPublishInfo.h" +#include "MessageId.h" +#include "MessageDecoder.h" + +namespace rmq +{ + + +MQAdminImpl::MQAdminImpl(MQClientFactory* pMQClientFactory) +{ + m_pMQClientFactory = pMQClientFactory; +} + +MQAdminImpl::~MQAdminImpl() +{ + +} + +void MQAdminImpl::createTopic(const std::string& key, const std::string& newTopic, + int queueNum) +{ + return createTopic(key, newTopic, queueNum, 0); +} + + +void MQAdminImpl::createTopic(const std::string& key, const std::string& newTopic, + int queueNum, int topicSysFlag) +{ + try + { + MQClientAPIImpl* api = m_pMQClientFactory->getMQClientAPIImpl(); + TopicRouteDataPtr topicRouteData = api->getTopicRouteInfoFromNameServer(key, 1000 * 3); + + std::list<BrokerData> brokerDataList = topicRouteData->getBrokerDatas(); + if (!brokerDataList.empty()) + { + brokerDataList.sort(); + + MQClientException exception("", 0, "", 0); + bool hasException = false; + + std::list<BrokerData>::iterator it = brokerDataList.begin(); + + for (; it != brokerDataList.end(); it++) + { + std::map<int, std::string>::iterator it1 = (*it).brokerAddrs.find(MixAll::MASTER_ID); + if (it1 != (*it).brokerAddrs.end()) + { + std::string addr = it1->second; + + TopicConfig topicConfig(newTopic); + topicConfig.setReadQueueNums(queueNum); + topicConfig.setWriteQueueNums(queueNum); + topicConfig.setTopicSysFlag(topicSysFlag); + + try + { + api->createTopic(addr, key, topicConfig, 1000 * 3); + } + catch (MQClientException& e) + { + hasException = true; + exception = e; + } + } + } + + if (hasException) + { + throw exception; + } + } + else + { + THROW_MQEXCEPTION(MQClientException, "Not found broker, maybe key is wrong", -1); + } + } + catch (MQClientException e) + { + THROW_MQEXCEPTION(MQClientException, "create new topic failed", -1); + } +} + +std::vector<MessageQueue>* MQAdminImpl::fetchPublishMessageQueues(const std::string& topic) +{ + try + { + MQClientAPIImpl* api = m_pMQClientFactory->getMQClientAPIImpl(); + TopicRouteDataPtr topicRouteData = api->getTopicRouteInfoFromNameServer(topic, 1000 * 3); + + if (topicRouteData.ptr() != NULL) + { + TopicPublishInfoPtr topicPublishInfo = + MQClientFactory::topicRouteData2TopicPublishInfo(topic, *topicRouteData); + if (topicPublishInfo.ptr() != NULL && topicPublishInfo->ok()) + { + std::vector<MessageQueue>* ret = new std::vector<MessageQueue>(); + (*ret) = topicPublishInfo->getMessageQueueList(); + + /* + std::vector<MessageQueue>& mqs = ; + std::vector<MessageQueue>::iterator it = mqs.begin(); + for (; it != mqs.end(); it++) + { + ret->push_back(*it); + } + */ + + return ret; + } + } + } + catch (MQClientException e) + { + THROW_MQEXCEPTION(MQClientException, "Can not find Message Queue for this topic" + topic, -1); + } + + THROW_MQEXCEPTION(MQClientException, "Unknow why, Can not find Message Queue for this topic, " + topic, -1); +} + +std::set<MessageQueue>* MQAdminImpl::fetchSubscribeMessageQueues(const std::string& topic) +{ + try + { + TopicRouteDataPtr topicRouteData = + m_pMQClientFactory->getMQClientAPIImpl()->getTopicRouteInfoFromNameServer(topic, 1000 * 3); + if (topicRouteData.ptr() != NULL) + { + std::set<MessageQueue>* mqList = + MQClientFactory::topicRouteData2TopicSubscribeInfo(topic, *topicRouteData); + if (!mqList->empty()) + { + return mqList; + } + else + { + THROW_MQEXCEPTION(MQClientException, "Can not find Message Queue for this topic" + topic, -1); + } + } + } + catch (MQClientException e) + { + THROW_MQEXCEPTION(MQClientException, "Can not find Message Queue for this topic" + topic, -1); + } + + THROW_MQEXCEPTION(MQClientException, "Unknow why, Can not find Message Queue for this topic: " + topic, -1); +} + +long long MQAdminImpl::searchOffset(const MessageQueue& mq, long long timestamp) +{ + std::string brokerAddr = m_pMQClientFactory->findBrokerAddressInPublish(mq.getBrokerName()); + if (brokerAddr.empty()) + { + m_pMQClientFactory->updateTopicRouteInfoFromNameServer(mq.getTopic()); + brokerAddr = m_pMQClientFactory->findBrokerAddressInPublish(mq.getBrokerName()); + } + + if (!brokerAddr.empty()) + { + try + { + return m_pMQClientFactory->getMQClientAPIImpl()->searchOffset(brokerAddr, mq.getTopic(), + mq.getQueueId(), timestamp, 1000 * 3); + } + catch (MQClientException e) + { + THROW_MQEXCEPTION(MQClientException, "Invoke Broker[" + brokerAddr + "] exception", -1); + } + } + THROW_MQEXCEPTION(MQClientException, "The broker[" + mq.getBrokerName() + "] not exist", -1); +} + +long long MQAdminImpl::maxOffset(const MessageQueue& mq) +{ + std::string brokerAddr = m_pMQClientFactory->findBrokerAddressInPublish(mq.getBrokerName()); + if (brokerAddr.empty()) + { + m_pMQClientFactory->updateTopicRouteInfoFromNameServer(mq.getTopic()); + brokerAddr = m_pMQClientFactory->findBrokerAddressInPublish(mq.getBrokerName()); + } + + if (!brokerAddr.empty()) + { + try + { + return m_pMQClientFactory->getMQClientAPIImpl()->getMaxOffset(brokerAddr, mq.getTopic(), + mq.getQueueId(), 1000 * 3); + } + catch (MQClientException e) + { + THROW_MQEXCEPTION(MQClientException, "Invoke Broker[" + brokerAddr + "] exception", -1); + } + } + THROW_MQEXCEPTION(MQClientException, "The broker[" + mq.getBrokerName() + "] not exist", -1); +} + +long long MQAdminImpl::minOffset(const MessageQueue& mq) +{ + std::string brokerAddr = m_pMQClientFactory->findBrokerAddressInPublish(mq.getBrokerName()); + if (brokerAddr.empty()) + { + m_pMQClientFactory->updateTopicRouteInfoFromNameServer(mq.getTopic()); + brokerAddr = m_pMQClientFactory->findBrokerAddressInPublish(mq.getBrokerName()); + } + + if (!brokerAddr.empty()) + { + try + { + return m_pMQClientFactory->getMQClientAPIImpl()->getMinOffset(brokerAddr, mq.getTopic(), + mq.getQueueId(), 1000 * 3); + } + catch (MQClientException e) + { + THROW_MQEXCEPTION(MQClientException, "Invoke Broker[" + brokerAddr + "] exception", -1); + } + } + + THROW_MQEXCEPTION(MQClientException, "The broker[" + mq.getBrokerName() + "] not exist", -1); +} + +long long MQAdminImpl::earliestMsgStoreTime(const MessageQueue& mq) +{ + std::string brokerAddr = m_pMQClientFactory->findBrokerAddressInPublish(mq.getBrokerName()); + if (brokerAddr.empty()) + { + m_pMQClientFactory->updateTopicRouteInfoFromNameServer(mq.getTopic()); + brokerAddr = m_pMQClientFactory->findBrokerAddressInPublish(mq.getBrokerName()); + } + + if (!brokerAddr.empty()) + { + try + { + return m_pMQClientFactory->getMQClientAPIImpl()->getEarliestMsgStoretime(brokerAddr, + mq.getTopic(), mq.getQueueId(), 1000 * 3); + } + catch (MQClientException e) + { + THROW_MQEXCEPTION(MQClientException, "Invoke Broker[" + brokerAddr + "] exception", -1); + } + } + + THROW_MQEXCEPTION(MQClientException, "The broker[" + mq.getBrokerName() + "] not exist", -1); +} + +MessageExt* MQAdminImpl::viewMessage(const std::string& msgId) +{ + try + { + MessageId messageId = MessageDecoder::decodeMessageId(msgId); + return m_pMQClientFactory->getMQClientAPIImpl()->viewMessage( + socketAddress2String(messageId.getAddress()), messageId.getOffset(), 1000 * 3); + } + catch (UnknownHostException e) + { + THROW_MQEXCEPTION(MQClientException, "message id illegal", -1); + } +} + +QueryResult MQAdminImpl::queryMessage(const std::string& topic, + const std::string& key, + int maxNum, long long begin, long long end) +{ + //TODO + std::list<MessageExt*> messageList; + QueryResult result(0, messageList); + + return result; +} + +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/6a45c767/rocketmq-client4cpp/src/MQAdminImpl.h ---------------------------------------------------------------------- diff --git a/rocketmq-client4cpp/src/MQAdminImpl.h b/rocketmq-client4cpp/src/MQAdminImpl.h new file mode 100755 index 0000000..907d61e --- /dev/null +++ b/rocketmq-client4cpp/src/MQAdminImpl.h @@ -0,0 +1,63 @@ +/** +* Copyright (C) 2013 kangliqiang ,[email protected] +* +* Licensed under the Apache License, Version 2.0 (the "License"); +* you may not use this file except in compliance with the License. +* You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +*/ + +#ifndef __MQADMINIMPL_H__ +#define __MQADMINIMPL_H__ + +#include <string> +#include <list> +#include <set> +#include <vector> + +#include "MessageExt.h" +#include "QueryResult.h" + +namespace rmq +{ + class MQClientFactory; + class MessageQueue; + + class MQAdminImpl + { + public: + MQAdminImpl(MQClientFactory* pMQClientFactory); + ~MQAdminImpl(); + + void createTopic(const std::string& key, const std::string& newTopic, int queueNum); + void createTopic(const std::string& key, const std::string& newTopic, int queueNum, int topicSysFlag); + + std::vector<MessageQueue>* fetchPublishMessageQueues(const std::string& topic); + std::set<MessageQueue>* fetchSubscribeMessageQueues(const std::string& topic); + long long searchOffset(const MessageQueue& mq, long long timestamp); + long long maxOffset(const MessageQueue& mq); + long long minOffset(const MessageQueue& mq); + + long long earliestMsgStoreTime(const MessageQueue& mq); + + MessageExt* viewMessage(const std::string& msgId); + + QueryResult queryMessage(const std::string& topic, + const std::string& key, + int maxNum, + long long begin, + long long end); + + private: + MQClientFactory* m_pMQClientFactory; + }; +} + +#endif
