http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/6a45c767/rocketmq-client4cpp/include/MQClientException.h
----------------------------------------------------------------------
diff --git a/rocketmq-client4cpp/include/MQClientException.h 
b/rocketmq-client4cpp/include/MQClientException.h
new file mode 100755
index 0000000..f1d1d04
--- /dev/null
+++ b/rocketmq-client4cpp/include/MQClientException.h
@@ -0,0 +1,105 @@
+/**
+ * Copyright (C) 2013 kangliqiang ,[email protected]
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef __RMQ_MQCLIENTEXCEPTION_H__
+#define __RMQ_MQCLIENTEXCEPTION_H__
+
+#include <string>
+#include <ostream>
+#include <sstream>
+#include <exception>
+
+#include "RocketMQClient.h"
+
+namespace rmq
+{
+       class MQException : public std::exception
+       {
+       public:
+               MQException(const std::string& msg, int error,const char* 
file,int line)throw()
+                       : m_error(error),m_line(line),m_file(file)
+               {
+                       try
+                       {
+                               std::stringstream ss;
+                               ss << "[" << file << ":" << line <<"]|error: " 
<< error << "|msg:" << msg;
+                               m_msg = ss.str();
+                       }
+                       catch (...)
+                       {
+                       }
+               }
+
+               virtual ~MQException()throw()
+               {
+               }
+
+               const char* what() const throw()
+               {
+                       return m_msg.c_str();
+               }
+
+               int GetError() const throw()
+               {
+                       return m_error;
+               }
+
+               virtual const char* GetType() const throw()
+               {
+                       return "MQException";
+               }
+
+       protected:
+               int m_error;
+               int m_line;
+               std::string m_msg;
+               std::string m_file;
+       };
+
+       inline std::ostream& operator<<(std::ostream& os, const MQException& e)
+       {
+               os <<"Type:"<<e.GetType() <<  e.what();
+               return os;
+       }
+
+       #define DEFINE_MQCLIENTEXCEPTION(name, parent) \
+       class name : public parent \
+       {\
+       public:\
+               name(const std::string& msg, int error,const char* file,int 
line) throw ()\
+               : parent(msg, error, file, line) {}\
+               virtual const char* GetType() const throw()\
+       {\
+               return #name;\
+       }\
+       };
+
+       DEFINE_MQCLIENTEXCEPTION(MQClientException, MQException)
+       DEFINE_MQCLIENTEXCEPTION(MQBrokerException, MQException)
+       DEFINE_MQCLIENTEXCEPTION(InterruptedException, MQException)
+       DEFINE_MQCLIENTEXCEPTION(UnknownHostException, MQException)
+
+       DEFINE_MQCLIENTEXCEPTION(RemotingException, MQException)
+       DEFINE_MQCLIENTEXCEPTION(RemotingCommandException, RemotingException)
+       DEFINE_MQCLIENTEXCEPTION(RemotingConnectException, RemotingException)
+       DEFINE_MQCLIENTEXCEPTION(RemotingSendRequestException, 
RemotingException)
+       DEFINE_MQCLIENTEXCEPTION(RemotingTimeoutException, RemotingException)
+       DEFINE_MQCLIENTEXCEPTION(RemotingTooMuchRequestException, 
RemotingException)
+
+       #define THROW_MQEXCEPTION(e,msg,err) throw e(msg,err,__FILE__,__LINE__)
+}
+
+#endif

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/6a45c767/rocketmq-client4cpp/include/MQConsumer.h
----------------------------------------------------------------------
diff --git a/rocketmq-client4cpp/include/MQConsumer.h 
b/rocketmq-client4cpp/include/MQConsumer.h
new file mode 100755
index 0000000..87efe97
--- /dev/null
+++ b/rocketmq-client4cpp/include/MQConsumer.h
@@ -0,0 +1,48 @@
+/**
+* Copyright (C) 2013 kangliqiang ,[email protected]
+*
+* Licensed under the Apache License, Version 2.0 (the "License");
+* you may not use this file except in compliance with the License.
+* You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+#ifndef __RMQ_MQCONSUMER_H__
+#define __RMQ_MQCONSUMER_H__
+
+#include <set>
+#include <string>
+
+#include "RocketMQClient.h"
+#include "MQAdmin.h"
+#include "ConsumeType.h"
+
+
+namespace rmq
+{
+       class MessageExt;
+
+       /**
+       * Consumer interface
+       *
+       */
+       class MQConsumer : public MQAdmin
+       {
+       public:
+               virtual ~MQConsumer(){}
+
+               virtual void  start()=0;
+               virtual void shutdown()=0;
+
+               virtual void sendMessageBack(MessageExt& msg, int delayLevel)=0;
+               virtual std::set<MessageQueue>* 
fetchSubscribeMessageQueues(const std::string& topic)=0;
+       };
+}
+#endif

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/6a45c767/rocketmq-client4cpp/include/MQProducer.h
----------------------------------------------------------------------
diff --git a/rocketmq-client4cpp/include/MQProducer.h 
b/rocketmq-client4cpp/include/MQProducer.h
new file mode 100755
index 0000000..b353aba
--- /dev/null
+++ b/rocketmq-client4cpp/include/MQProducer.h
@@ -0,0 +1,71 @@
+/**
+* Copyright (C) 2013 kangliqiang ,[email protected]
+*
+* Licensed under the Apache License, Version 2.0 (the "License")=0;
+* you may not use this file except in compliance with the License.
+* You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+#ifndef __RMQ_MQPRODUCER_H__
+#define __RMQ_MQPRODUCER_H__
+
+#include <vector>
+#include <string>
+
+#include "RocketMQClient.h"
+#include "MQAdmin.h"
+#include "SendResult.h"
+
+namespace rmq
+{
+       class MessageQueue;
+       class SendCallback;
+       class LocalTransactionExecuter;
+       class MessageQueueSelector;
+
+       /**
+       * Producer interface
+       *
+       */
+       class MQProducer : public MQAdmin
+       {
+       public:
+               MQProducer()
+               {
+               }
+
+               virtual ~MQProducer()
+               {
+               }
+
+               virtual void start()=0;
+               virtual void shutdown()=0;
+
+               virtual std::vector<MessageQueue>* 
fetchPublishMessageQueues(const std::string& topic)=0;
+
+               virtual SendResult send(Message& msg)=0;
+               virtual void send(Message& msg, SendCallback* sendCallback)=0;
+               virtual void sendOneway(Message& msg)=0;
+
+               virtual SendResult send(Message& msg, MessageQueue& mq)=0;
+               virtual void send(Message& msg, MessageQueue& mq, SendCallback* 
sendCallback)=0;
+               virtual void sendOneway(Message& msg, MessageQueue& mq)=0;
+
+               virtual SendResult send(Message& msg, MessageQueueSelector* 
selector, void* arg)=0;
+               virtual void send(Message& msg, MessageQueueSelector* selector, 
void* arg, SendCallback* sendCallback)=0;
+               virtual void sendOneway(Message& msg, MessageQueueSelector* 
selector, void* arg)=0;
+
+               virtual TransactionSendResult sendMessageInTransaction(Message& 
msg,
+                                                                               
                                                LocalTransactionExecuter* 
tranExecuter,
+                                                                               
                                                void* arg)=0;
+       };
+}
+#endif

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/6a45c767/rocketmq-client4cpp/include/MQPullConsumer.h
----------------------------------------------------------------------
diff --git a/rocketmq-client4cpp/include/MQPullConsumer.h 
b/rocketmq-client4cpp/include/MQPullConsumer.h
new file mode 100755
index 0000000..ffb2ac5
--- /dev/null
+++ b/rocketmq-client4cpp/include/MQPullConsumer.h
@@ -0,0 +1,54 @@
+/**
+* Copyright (C) 2013 kangliqiang ,[email protected]
+*
+* Licensed under the Apache License, Version 2.0 (the "License");
+* you may not use this file except in compliance with the License.
+* You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+#ifndef __RMQ_MQPULLCONSUMER_H__
+#define __RMQ_MQPULLCONSUMER_H__
+
+#include <set>
+#include <string>
+
+#include "RocketMQClient.h"
+#include "MQConsumer.h"
+#include "PullResult.h"
+
+namespace rmq
+{
+       class MessageQueueListener;
+       class MessageQueue;
+       class PullCallback;
+
+       /**
+       * Pull Consumer
+       *
+       */
+       class MQPullConsumer : public MQConsumer
+       {
+       public:
+               virtual ~MQPullConsumer(){}
+               virtual void registerMessageQueueListener(const std::string& 
topic, MessageQueueListener* pListener)=0;
+
+               virtual PullResult* pull(MessageQueue& mq, const std::string& 
subExpression, long long offset,int maxNums)=0;
+               virtual void pull(MessageQueue& mq, const std::string& 
subExpression, long long offset, int maxNums, PullCallback* pPullCallback)=0;
+
+               virtual PullResult* pullBlockIfNotFound(MessageQueue& mq, const 
std::string& subExpression, long long offset, int maxNums)=0;
+               virtual void pullBlockIfNotFound(MessageQueue& mq, const 
std::string& subExpression, long long offset, int maxNums, PullCallback* 
pPullCallback)=0;
+
+               virtual void updateConsumeOffset(MessageQueue& mq, long long 
offset)=0;
+               virtual long long fetchConsumeOffset(MessageQueue& mq, bool 
fromStore)=0;
+
+               virtual std::set<MessageQueue>* 
fetchMessageQueuesInBalance(const std::string& topic)=0;
+       };
+}
+#endif

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/6a45c767/rocketmq-client4cpp/include/MQPushConsumer.h
----------------------------------------------------------------------
diff --git a/rocketmq-client4cpp/include/MQPushConsumer.h 
b/rocketmq-client4cpp/include/MQPushConsumer.h
new file mode 100755
index 0000000..fe6d4a0
--- /dev/null
+++ b/rocketmq-client4cpp/include/MQPushConsumer.h
@@ -0,0 +1,49 @@
+/**
+* Copyright (C) 2013 kangliqiang ,[email protected]
+*
+* Licensed under the Apache License, Version 2.0 (the "License");
+* you may not use this file except in compliance with the License.
+* You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+#ifndef __RMQ_MQPUSHCONSUMER_H__
+#define __RMQ_MQPUSHCONSUMER_H__
+
+#include <set>
+#include <string>
+
+#include "RocketMQClient.h"
+#include "MQConsumer.h"
+#include "PullResult.h"
+
+namespace rmq
+{
+       class MessageListener;
+
+       /**
+       * Push Consumer
+       *
+       */
+       class MQPushConsumer : public MQConsumer
+       {
+       public:
+               virtual void registerMessageListener(MessageListener* 
pMessageListener)=0;
+
+
+               virtual  void subscribe(const std::string& topic, const 
std::string& subExpression)=0;
+               virtual void unsubscribe(const std::string& topic)=0;
+
+
+               virtual void updateCorePoolSize(int corePoolSize)=0;
+               virtual void suspend()=0;
+               virtual void resume()=0;
+       };
+}
+#endif

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/6a45c767/rocketmq-client4cpp/include/Message.h
----------------------------------------------------------------------
diff --git a/rocketmq-client4cpp/include/Message.h 
b/rocketmq-client4cpp/include/Message.h
new file mode 100755
index 0000000..441b4e5
--- /dev/null
+++ b/rocketmq-client4cpp/include/Message.h
@@ -0,0 +1,136 @@
+/**
+* Copyright (C) 2013 kangliqiang ,[email protected]
+*
+* Licensed under the Apache License, Version 2.0 (the "License");
+* you may not use this file except in compliance with the License.
+* You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+#ifndef __RMQ_MESSAGE_H__
+#define __RMQ_MESSAGE_H__
+
+#include <map>
+#include <string>
+#include <list>
+#include "RocketMQClient.h"
+
+namespace rmq
+{
+       /**
+       * Message
+       *
+       */
+       class Message
+       {
+       public:
+               Message();
+               Message(const std::string& topic, const char* body,int len);
+               Message(const std::string& topic, const std::string& tags, 
const char* body,int len);
+               Message(const std::string& topic, const std::string& tags,const 
std::string& keys, const char* body,int len);
+               Message(const std::string& topic,
+                               const std::string& tags,
+                               const std::string& keys,
+                               const int       flag,
+                               const char* body,
+                               int len,
+                               bool waitStoreMsgOK);
+
+               virtual ~Message();
+               Message(const Message& other);
+               Message& operator=(const Message& other);
+
+               void clearProperty(const std::string& name);
+               void putProperty(const std::string& name, const std::string& 
value);
+               std::string getProperty(const std::string& name);
+
+               std::string getTopic()const;
+               void setTopic(const std::string& topic);
+
+               std::string getTags();
+               void setTags(const std::string& tags);
+
+               std::string getKeys();
+               void setKeys(const std::string& keys);
+               void setKeys(const std::list<std::string> keys);
+
+               int getDelayTimeLevel();
+               void setDelayTimeLevel(int level);
+
+               bool isWaitStoreMsgOK();
+               void setWaitStoreMsgOK(bool waitStoreMsgOK);
+
+               int getFlag();
+               void setFlag(int flag);
+
+               const char* getBody() const;
+               int getBodyLen() const;
+               void setBody(const char* body, int len);
+
+               bool tryToCompress(int compressLevel);
+               const char* getCompressBody() const;
+               int getCompressBodyLen() const;
+
+               std::map<std::string, std::string>& getProperties();
+               void setProperties(const std::map<std::string, std::string>& 
properties);
+
+               std::string toString() const;
+
+       protected:
+               void Init(const std::string& topic,
+                                 const std::string& tags,
+                                 const std::string& keys,
+                                 const int     flag,
+                                 const char* body,
+                                 int len,
+                                 bool waitStoreMsgOK);
+
+       public:
+               static const std::string PROPERTY_KEYS;
+               static const std::string PROPERTY_TAGS;
+               static const std::string PROPERTY_WAIT_STORE_MSG_OK;
+               static const std::string PROPERTY_DELAY_TIME_LEVEL;
+
+               /**
+               * for inner use
+               */
+               static const std::string PROPERTY_RETRY_TOPIC;
+               static const std::string PROPERTY_REAL_TOPIC;
+               static const std::string PROPERTY_REAL_QUEUE_ID;
+               static const std::string PROPERTY_TRANSACTION_PREPARED;
+               static const std::string PROPERTY_PRODUCER_GROUP;
+               static const std::string PROPERTY_MIN_OFFSET;
+               static const std::string PROPERTY_MAX_OFFSET;
+               static const std::string PROPERTY_BUYER_ID;
+               static const std::string PROPERTY_ORIGIN_MESSAGE_ID;
+               static const std::string PROPERTY_TRANSFER_FLAG;
+               static const std::string PROPERTY_CORRECTION_FLAG;
+               static const std::string PROPERTY_MQ2_FLAG;
+               static const std::string PROPERTY_RECONSUME_TIME;
+               static const std::string PROPERTY_MSG_REGION;
+               static const std::string PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX;
+               static const std::string PROPERTY_MAX_RECONSUME_TIMES;
+               static const std::string PROPERTY_CONSUME_START_TIMESTAMP;
+
+               static const std::string KEY_SEPARATOR;
+       private:
+               std::string m_topic;
+               int m_flag;
+               std::map<std::string, std::string> m_properties;
+
+               char* m_body;
+               int   m_bodyLen;
+
+               char* m_compressBody;
+               int   m_compressBodyLen;
+       };
+}
+
+#endif

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/6a45c767/rocketmq-client4cpp/include/MessageExt.h
----------------------------------------------------------------------
diff --git a/rocketmq-client4cpp/include/MessageExt.h 
b/rocketmq-client4cpp/include/MessageExt.h
new file mode 100755
index 0000000..f70041c
--- /dev/null
+++ b/rocketmq-client4cpp/include/MessageExt.h
@@ -0,0 +1,108 @@
+/**
+* Copyright (C) 2013 kangliqiang ,[email protected]
+*
+* Licensed under the Apache License, Version 2.0 (the "License");
+* you may not use this file except in compliance with the License.
+* You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+#ifndef __RMQ_MESSAGEEXT_H__
+#define __RMQ_MESSAGEEXT_H__
+
+#include <sys/socket.h>
+#include <string>
+#include "Message.h"
+#include "TopicFilterType.h"
+#include "RocketMQClient.h"
+
+namespace rmq
+       {
+       /**
+       * Message extend
+       *
+       */
+       class MessageExt : public Message
+       {
+       public:
+               MessageExt();
+
+               MessageExt(int queueId,
+                                  long long bornTimestamp,
+                                  sockaddr bornHost,
+                                  long long storeTimestamp,
+                                  sockaddr storeHost,
+                                  std::string msgId);
+
+               ~MessageExt();
+
+               static TopicFilterType parseTopicFilterType(int sysFlag);
+
+               int getQueueId();
+               void setQueueId(int queueId);
+
+               long long getBornTimestamp();
+               void setBornTimestamp(long long bornTimestamp);
+
+               sockaddr getBornHost();
+               std::string getBornHostString();
+               std::string getBornHostNameString();
+               void setBornHost(const sockaddr& bornHost);
+
+               long long getStoreTimestamp();
+               void setStoreTimestamp(long long storeTimestamp);
+
+               sockaddr getStoreHost();
+               std::string getStoreHostString();
+               void setStoreHost(const sockaddr& storeHost);
+
+               std::string getMsgId();
+               void setMsgId(const std::string& msgId);
+
+               int getSysFlag();
+               void setSysFlag(int sysFlag);
+
+               int getBodyCRC();
+               void setBodyCRC(int bodyCRC);
+
+               long long getQueueOffset();
+               void setQueueOffset(long long queueOffset);
+
+               long long getCommitLogOffset();
+               void setCommitLogOffset(long long physicOffset);
+
+               int getStoreSize();
+               void setStoreSize(int storeSize);
+
+               int getReconsumeTimes();
+               void setReconsumeTimes(int reconsumeTimes);
+
+               long long getPreparedTransactionOffset();
+               void setPreparedTransactionOffset(long long 
preparedTransactionOffset);
+
+               std::string toString() const;
+
+       private:
+               long long m_queueOffset;
+               long long m_commitLogOffset;
+               long long m_bornTimestamp;
+               long long m_storeTimestamp;
+               long long m_preparedTransactionOffset;
+               int m_queueId;
+               int m_storeSize;
+               int m_sysFlag;
+               int m_bodyCRC;
+               int m_reconsumeTimes;
+               sockaddr m_bornHost;
+               sockaddr m_storeHost;
+               std::string m_msgId;
+       };
+}
+
+#endif

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/6a45c767/rocketmq-client4cpp/include/MessageListener.h
----------------------------------------------------------------------
diff --git a/rocketmq-client4cpp/include/MessageListener.h 
b/rocketmq-client4cpp/include/MessageListener.h
new file mode 100755
index 0000000..130a219
--- /dev/null
+++ b/rocketmq-client4cpp/include/MessageListener.h
@@ -0,0 +1,94 @@
+/**
+* Copyright (C) 2013 kangliqiang ,[email protected]
+*
+* Licensed under the Apache License, Version 2.0 (the "License");
+* you may not use this file except in compliance with the License.
+* You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+#ifndef __RMQ_MESSAGELISTENER_H__
+#define __RMQ_MESSAGELISTENER_H__
+
+#include <limits.h>
+#include <list>
+
+#include "MessageExt.h"
+#include "MessageQueue.h"
+
+namespace rmq
+{
+       /**
+       * Message Listener
+       *
+       */
+       class MessageListener
+       {
+       public:
+               virtual ~MessageListener(){}
+       };
+
+       enum ConsumeOrderlyStatus
+       {
+               SUCCESS,
+               ROLLBACK,
+               COMMIT,
+               SUSPEND_CURRENT_QUEUE_A_MOMENT,
+       };
+
+       typedef struct tagConsumeOrderlyContext
+       {
+               tagConsumeOrderlyContext(MessageQueue& mq)
+                       :messageQueue(mq),
+                       autoCommit(true),
+                       suspendCurrentQueueTimeMillis(1000)
+               {
+               }
+
+               MessageQueue messageQueue;///< 
Ҫ���ѵ���Ϣ�����ĸ�����
+               bool autoCommit;///< ��ϢOffset�Ƿ��Զ��ύ
+               long suspendCurrentQueueTimeMillis;
+       }ConsumeOrderlyContext;
+
+       class MessageListenerOrderly : public MessageListener
+       {
+       public:
+               virtual ConsumeOrderlyStatus 
consumeMessage(std::list<MessageExt*>& msgs,
+                                                                               
                        ConsumeOrderlyContext& context)=0;
+       };
+
+       enum ConsumeConcurrentlyStatus
+       {
+               CONSUME_SUCCESS,
+               RECONSUME_LATER,
+       };
+
+       struct ConsumeConcurrentlyContext
+       {
+               ConsumeConcurrentlyContext(MessageQueue& mq)
+                       :messageQueue(mq),
+                       delayLevelWhenNextConsume(0),
+                       ackIndex(INT_MAX)
+               {
+               }
+               MessageQueue messageQueue;
+               int delayLevelWhenNextConsume;
+               int ackIndex;
+       };
+
+       class MessageListenerConcurrently : public MessageListener
+       {
+       public:
+               virtual ConsumeConcurrentlyStatus 
consumeMessage(std::list<MessageExt*>& msgs,
+                                                                               
                                ConsumeConcurrentlyContext& context)=0;
+       };
+}
+
+#endif

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/6a45c767/rocketmq-client4cpp/include/MessageQueue.h
----------------------------------------------------------------------
diff --git a/rocketmq-client4cpp/include/MessageQueue.h 
b/rocketmq-client4cpp/include/MessageQueue.h
new file mode 100755
index 0000000..89ddf58
--- /dev/null
+++ b/rocketmq-client4cpp/include/MessageQueue.h
@@ -0,0 +1,70 @@
+/**
+* Copyright (C) 2013 kangliqiang ,[email protected]
+*
+* Licensed under the Apache License, Version 2.0 (the "License");
+* you may not use this file except in compliance with the License.
+* You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+#ifndef __RMQ_MESSAGEQUEUE_H__
+#define __RMQ_MESSAGEQUEUE_H__
+
+#include <iostream>
+#include <string>
+#include <sstream>
+
+#include "RocketMQClient.h"
+
+namespace rmq
+{
+       /**
+       * Message Queue
+       *
+       */
+       class MessageQueue
+       {
+       public:
+               MessageQueue();
+               ~MessageQueue(){};
+
+               MessageQueue(const std::string& topic, const std::string& 
brokerName, int queueId);
+
+               std::string getTopic()const;
+               void setTopic(const std::string& topic);
+
+               std::string getBrokerName()const;
+               void setBrokerName(const std::string& brokerName);
+
+               int getQueueId()const;
+               void setQueueId(int queueId);
+
+               int hashCode();
+               std::string toString() const;
+               std::string toJsonString() const;
+
+               bool operator==(const MessageQueue& mq) const;
+               bool operator<(const MessageQueue& mq) const;
+               int compareTo(const MessageQueue& mq) const;
+
+       private:
+               std::string m_topic;
+               std::string m_brokerName;
+               int m_queueId;
+       };
+
+       inline std::ostream& operator<<(std::ostream& os, const MessageQueue& 
obj)
+       {
+           os << obj.toString();
+           return os;
+       }
+}
+
+#endif

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/6a45c767/rocketmq-client4cpp/include/MessageQueueListener.h
----------------------------------------------------------------------
diff --git a/rocketmq-client4cpp/include/MessageQueueListener.h 
b/rocketmq-client4cpp/include/MessageQueueListener.h
new file mode 100755
index 0000000..9f04c3e
--- /dev/null
+++ b/rocketmq-client4cpp/include/MessageQueueListener.h
@@ -0,0 +1,38 @@
+/**
+ * Copyright (C) 2013 kangliqiang ,[email protected]
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#ifndef __RMQ_MESSAGEQUEUELISTENER_H__
+#define __RMQ_MESSAGEQUEUELISTENER_H__
+
+#include <set>
+#include "RocketMQClient.h"
+
+namespace rmq
+{
+       /**
+        * Message Queue Listener
+        *
+        */
+       class MessageQueueListener
+       {
+       public:
+               virtual ~MessageQueueListener() {}
+               virtual void messageQueueChanged(const std::string& topic,
+                       std::set<MessageQueue>& mqAll,
+                       std::set<MessageQueue>& mqDivided)=0;
+       };
+}
+
+#endif

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/6a45c767/rocketmq-client4cpp/include/OffsetStore.h
----------------------------------------------------------------------
diff --git a/rocketmq-client4cpp/include/OffsetStore.h 
b/rocketmq-client4cpp/include/OffsetStore.h
new file mode 100755
index 0000000..a533750
--- /dev/null
+++ b/rocketmq-client4cpp/include/OffsetStore.h
@@ -0,0 +1,58 @@
+/**
+* Copyright (C) 2013 kangliqiang ,[email protected]
+*
+* Licensed under the Apache License, Version 2.0 (the "License");
+* you may not use this file except in compliance with the License.
+* You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+#ifndef __RMQ_OFFSETSTORE_H__
+#define __RMQ_OFFSETSTORE_H__
+
+#include <set>
+#include <map>
+
+#include "RocketMQClient.h"
+
+namespace rmq
+{
+       class MessageQueue;
+
+       enum ReadOffsetType
+       {
+               READ_FROM_MEMORY,
+               READ_FROM_STORE,
+               MEMORY_FIRST_THEN_STORE,
+       };
+
+       /**
+       * Consumer Offset Store
+       *
+       */
+       class OffsetStore
+       {
+       public:
+               virtual ~OffsetStore() {}
+
+               virtual void load()=0;
+
+               virtual void updateOffset(const MessageQueue& mq, long long 
offset, bool increaseOnly)=0;
+               virtual long long readOffset(const MessageQueue& mq, 
ReadOffsetType type)=0;
+
+               virtual void persistAll(std::set<MessageQueue>& mqs)=0;
+               virtual void persist(const MessageQueue& mq)=0;
+
+               virtual void removeOffset(const MessageQueue& mq)=0;
+
+               virtual std::map<MessageQueue, long long> 
cloneOffsetTable(const std::string& topic) = 0;
+       };
+}
+
+#endif

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/6a45c767/rocketmq-client4cpp/include/PullCallback.h
----------------------------------------------------------------------
diff --git a/rocketmq-client4cpp/include/PullCallback.h 
b/rocketmq-client4cpp/include/PullCallback.h
new file mode 100755
index 0000000..47ade68
--- /dev/null
+++ b/rocketmq-client4cpp/include/PullCallback.h
@@ -0,0 +1,39 @@
+/**
+ * Copyright (C) 2013 kangliqiang ,[email protected]
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#ifndef __RMQ_PULLCALLBACK_H__
+#define __RMQ_PULLCALLBACK_H__
+
+#include "RocketMQClient.h"
+#include "PullResult.h"
+
+namespace rmq
+{
+       class MQException;
+
+       /**
+        * PullCallback
+        *
+        */
+       class PullCallback
+       {
+       public:
+               virtual ~PullCallback() {}
+               virtual void onSuccess(PullResult& pullResult)=0;
+               virtual void onException(MQException& e)=0;
+       };
+}
+
+#endif

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/6a45c767/rocketmq-client4cpp/include/PullResult.h
----------------------------------------------------------------------
diff --git a/rocketmq-client4cpp/include/PullResult.h 
b/rocketmq-client4cpp/include/PullResult.h
new file mode 100755
index 0000000..42c13ca
--- /dev/null
+++ b/rocketmq-client4cpp/include/PullResult.h
@@ -0,0 +1,91 @@
+/**
+* Copyright (C) 2013 kangliqiang ,[email protected]
+*
+* Licensed under the Apache License, Version 2.0 (the "License");
+* you may not use this file except in compliance with the License.
+* You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+#ifndef __RMQ_PULLRESULT_H__
+#define __RMQ_PULLRESULT_H__
+
+#include <list>
+#include <string>
+#include <sstream>
+
+#include "RocketMQClient.h"
+#include "MessageExt.h"
+
+namespace rmq
+{
+       enum PullStatus
+       {
+               FOUND,
+               NO_NEW_MSG,
+               NO_MATCHED_MSG,
+               OFFSET_ILLEGAL
+       };
+
+       /**
+       * PullResult
+       *
+       */
+       struct PullResult
+       {
+               PullResult()
+               {
+
+               }
+
+               PullResult(PullStatus pullStatus,
+                                  long long nextBeginOffset,
+                                  long long minOffset,
+                                  long long maxOffset,
+                                  std::list<MessageExt*>& msgFoundList)
+                       :pullStatus(pullStatus),
+                        nextBeginOffset(nextBeginOffset),
+                        minOffset(minOffset),
+                        maxOffset(maxOffset),
+                        msgFoundList(msgFoundList)
+               {
+
+               }
+
+               ~PullResult()
+               {
+                       std::list<MessageExt*>::iterator it = 
msgFoundList.begin();
+
+                       for (;it!=msgFoundList.end();it++)
+                       {
+                               delete *it;
+                       }
+               }
+
+               std::string toString() const
+               {
+                       std::stringstream ss;
+                       ss      << "{pullStatus=" << pullStatus
+                               << ",nextBeginOffset=" << nextBeginOffset
+                               << ",minOffset=" << nextBeginOffset
+                               << ",maxOffset=" << nextBeginOffset
+                               << ",msgFoundList.size=" << msgFoundList.size()
+                               <<"}";
+                       return ss.str();
+               }
+
+               PullStatus pullStatus;
+               long long nextBeginOffset;
+               long long minOffset;
+               long long maxOffset;
+               std::list<MessageExt*> msgFoundList;
+       };
+}
+
+#endif

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/6a45c767/rocketmq-client4cpp/include/QueryResult.h
----------------------------------------------------------------------
diff --git a/rocketmq-client4cpp/include/QueryResult.h 
b/rocketmq-client4cpp/include/QueryResult.h
new file mode 100644
index 0000000..13164e4
--- /dev/null
+++ b/rocketmq-client4cpp/include/QueryResult.h
@@ -0,0 +1,56 @@
+/**
+* Copyright (C) 2013 kangliqiang ,[email protected]
+*
+* Licensed under the Apache License, Version 2.0 (the "License");
+* you may not use this file except in compliance with the License.
+* You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+#ifndef __RMQ_QUERYRESULT_H__
+#define  __RMQ_QUERYRESULT_H__
+
+#include <list>
+
+#include "RocketMQClient.h"
+#include "MessageExt.h"
+
+namespace rmq
+{
+       /**
+       * QueryResult
+       *
+       */
+       class QueryResult
+       {
+       public:
+               QueryResult(long long indexLastUpdateTimestamp, const 
std::list<MessageExt*>& messageList)
+               {
+                       m_indexLastUpdateTimestamp = indexLastUpdateTimestamp;
+                       m_messageList = messageList;
+               }
+
+               long long getIndexLastUpdateTimestamp()
+               {
+                       return m_indexLastUpdateTimestamp;
+               }
+
+               std::list<MessageExt*>& getMessageList()
+               {
+                       return m_messageList;
+               }
+
+       private:
+               long long m_indexLastUpdateTimestamp;
+               std::list<MessageExt*> m_messageList;
+       };
+}
+
+#endif

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/6a45c767/rocketmq-client4cpp/include/RocketMQClient.h
----------------------------------------------------------------------
diff --git a/rocketmq-client4cpp/include/RocketMQClient.h 
b/rocketmq-client4cpp/include/RocketMQClient.h
new file mode 100755
index 0000000..e4c71c9
--- /dev/null
+++ b/rocketmq-client4cpp/include/RocketMQClient.h
@@ -0,0 +1,100 @@
+/**
+* Copyright (C) 2013 kangliqiang ,[email protected]
+*
+* Licensed under the Apache License, Version 2.0 (the "License");
+* you may not use this file except in compliance with the License.
+* You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+#ifndef __RMQ_ROCKETMQCLIENT_H__
+#define __RMQ_ROCKETMQCLIENT_H__
+
+#include <stdlib.h>
+#include <stdio.h>
+#include <stdint.h>
+#include <string.h>
+#include <assert.h>
+#include <time.h>
+#include <stdarg.h>
+#include <fcntl.h>
+#include <errno.h>
+#include <signal.h>
+#include <pthread.h>
+
+#include <sys/time.h>
+#include <sys/timeb.h>
+#include <sys/types.h>
+#include <sys/stat.h>
+#include <sys/file.h>
+#include <sys/syscall.h>
+#include <linux/unistd.h>
+
+#include <cstdio>
+#include <iostream>
+#include <string>
+#include <sstream>
+#include <vector>
+#include <map>
+#include <set>
+
+
+class RocketMQUtil
+{
+public:
+       enum
+    {
+        NONE_LOG    = 0,
+        ERROR_LOG   = 1,
+        WARN_LOG    = 2,
+        INFO_LOG    = 3,
+        DEBUG_LOG   = 4,
+    };
+
+public:
+       static pid_t getPid();
+       static pid_t getTid();
+
+       static int getDiffDays(time_t tmFirst, time_t tmSecond);
+       static std::string tm2str(const time_t &t, const std::string &sFormat);
+       static std::string now2str(const std::string &sFormat);
+       static std::string now2str();
+       static int64_t getNowMs();
+       static std::string str2fmt(const char* format, 
...)__attribute__((format(__printf__,1,2)));
+
+       static int initLog(const std::string& sLogPath);
+       static void setLogLevel(int logLevel);
+       static void writeLog(const char* fmt, ...) 
__attribute__((format(__printf__,1,2)));
+       static inline bool isNeedLog(int level)
+       {
+               return (level <= _logLevel);
+       };
+
+public:
+       static volatile int _logFd;
+       static int _logLevel;
+       static std::string _logPath;
+};
+
+#define RMQ_AUTO(name, value) typeof(value) name = value
+#define RMQ_FOR_EACH(container, it) \
+    for(typeof((container).begin()) it = 
(container).begin();it!=(container).end(); ++it)
+
+
+
+#define RMQ_DEBUG(fmt, args...)        do{ 
if(RocketMQUtil::isNeedLog(RocketMQUtil::DEBUG_LOG)) 
RocketMQUtil::writeLog("%d-%d|[%s][%s:%s:%d][DEBUG]|"fmt"\n", 
RocketMQUtil::getPid(), RocketMQUtil::getTid(), 
RocketMQUtil::now2str().c_str(), __FILE__, __func__,__LINE__, ##args);}while(0)
+#define RMQ_INFO(fmt, args...) do{ 
if(RocketMQUtil::isNeedLog(RocketMQUtil::INFO_LOG))  
RocketMQUtil::writeLog("%d-%d|[%s][%s:%s:%d][INFO]|"fmt"\n", 
RocketMQUtil::getPid(), RocketMQUtil::getTid(), 
RocketMQUtil::now2str().c_str(), __FILE__, __func__, __LINE__, ##args);}while(0)
+#define RMQ_WARN(fmt, args...) do{ 
if(RocketMQUtil::isNeedLog(RocketMQUtil::WARN_LOG))  
RocketMQUtil::writeLog("%d-%d|[%s][%s:%s:%d][WARN]|"fmt"\n", 
RocketMQUtil::getPid(), RocketMQUtil::getTid(), 
RocketMQUtil::now2str().c_str(), __FILE__, __func__, __LINE__, ##args);}while(0)
+#define RMQ_ERROR(fmt, args...)        do{ 
if(RocketMQUtil::isNeedLog(RocketMQUtil::ERROR_LOG)) 
RocketMQUtil::writeLog("%d-%d|[%s][%s:%s:%d][ERROR]|"fmt"\n", 
RocketMQUtil::getPid(), RocketMQUtil::getTid(), 
RocketMQUtil::now2str().c_str(), __FILE__, __func__, __LINE__, ##args);}while(0)
+
+#define RMQ_PRINT(fmt, args...)        do{ 
printf("%d|[%s][%s:%s:%d][DEBUG]|"fmt"\n", RocketMQUtil::getTid(), 
RocketMQUtil::now2str().c_str(), __FILE__, __func__,__LINE__, ##args);}while(0)
+
+
+#endif
+

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/6a45c767/rocketmq-client4cpp/include/SendCallback.h
----------------------------------------------------------------------
diff --git a/rocketmq-client4cpp/include/SendCallback.h 
b/rocketmq-client4cpp/include/SendCallback.h
new file mode 100755
index 0000000..0feb5a1
--- /dev/null
+++ b/rocketmq-client4cpp/include/SendCallback.h
@@ -0,0 +1,39 @@
+/**
+* Copyright (C) 2013 kangliqiang ,[email protected]
+*
+* Licensed under the Apache License, Version 2.0 (the "License");
+* you may not use this file except in compliance with the License.
+* You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+#ifndef __RMQ_SENDCALLBACK_H__
+#define __RMQ_SENDCALLBACK_H__
+
+#include "SendResult.h"
+#include "RocketMQClient.h"
+
+namespace rmq
+{
+       class MQException;
+
+       /**
+       * Send Mesage Callback
+       *
+       */
+       class SendCallback
+       {
+       public:
+               virtual ~SendCallback() {}
+               virtual void onSuccess(SendResult& sendResult)=0;
+               virtual void onException(MQException& e)=0;
+       };
+}
+
+#endif

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/6a45c767/rocketmq-client4cpp/include/SendMessageHook.h
----------------------------------------------------------------------
diff --git a/rocketmq-client4cpp/include/SendMessageHook.h 
b/rocketmq-client4cpp/include/SendMessageHook.h
new file mode 100644
index 0000000..9869aa6
--- /dev/null
+++ b/rocketmq-client4cpp/include/SendMessageHook.h
@@ -0,0 +1,50 @@
+/**
+* Copyright (C) 2013 kangliqiang ,[email protected]
+*
+* Licensed under the Apache License, Version 2.0 (the "License");
+* you may not use this file except in compliance with the License.
+* You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+#ifndef __RMQ_SENDMESSAGEHOOK_H__
+#define __RMQ_SENDMESSAGEHOOK_H__
+
+#include <string>
+
+#include "RocketMQClient.h"
+#include "Message.h"
+#include "MQClientException.h"
+
+namespace rmq
+{
+       class SendMessageContext
+       {
+       public:
+               std::string producerGroup;
+               Message msg;
+               MessageQueue mq;
+               std::string brokerAddr;
+               CommunicationMode communicationMode;
+               SendResult sendResult;
+               MQException* pException;
+               void* pArg;
+       };
+
+       class SendMessageHook
+       {
+       public:
+               virtual ~SendMessageHook() {}
+               virtual std::string hookName()=0;
+               virtual void sendMessageBefore(const SendMessageContext& 
context)=0;
+               virtual void sendMessageAfter(const SendMessageContext& 
context)=0;
+       };
+}
+
+#endif

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/6a45c767/rocketmq-client4cpp/include/SendResult.h
----------------------------------------------------------------------
diff --git a/rocketmq-client4cpp/include/SendResult.h 
b/rocketmq-client4cpp/include/SendResult.h
new file mode 100755
index 0000000..d6a3174
--- /dev/null
+++ b/rocketmq-client4cpp/include/SendResult.h
@@ -0,0 +1,89 @@
+/**
+* Copyright (C) 2013 kangliqiang ,[email protected]
+*
+* Licensed under the Apache License, Version 2.0 (the "License");
+* you may not use this file except in compliance with the License.
+* You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+#ifndef __RMQ_SENDRESULT_H__
+#define __RMQ_SENDRESULT_H__
+
+#include "RocketMQClient.h"
+#include "MessageQueue.h"
+
+namespace rmq
+{
+       enum SendStatus
+       {
+               SEND_OK,
+               FLUSH_DISK_TIMEOUT,
+               FLUSH_SLAVE_TIMEOUT,
+               SLAVE_NOT_AVAILABLE
+       };
+
+       /**
+       * Send Message Result
+       *
+       */
+       class SendResult
+       {
+       public:
+               SendResult();
+               SendResult(const SendStatus& sendStatus,
+                       const std::string&  msgId,
+                       MessageQueue& messageQueue,
+                       long long queueOffset,
+                       std::string&  projectGroupPrefix);
+
+               const std::string&  getMsgId();
+               void setMsgId(const std::string&  msgId);
+               SendStatus getSendStatus();
+               void setSendStatus(const SendStatus& sendStatus);
+               MessageQueue& getMessageQueue();
+               void setMessageQueue(MessageQueue& messageQueue);
+               long long getQueueOffset();
+               void setQueueOffset(long long queueOffset);
+               bool hasResult();
+
+               std::string toString() const;
+               std::string toJsonString() const;
+
+       private:
+               SendStatus m_sendStatus;
+               std::string m_msgId;
+               MessageQueue m_messageQueue;
+               long long m_queueOffset;
+       };
+
+       enum LocalTransactionState
+       {
+               COMMIT_MESSAGE,
+               ROLLBACK_MESSAGE,
+               UNKNOW,
+       };
+
+       /**
+       * Send transaction message result
+       *
+       */
+       class TransactionSendResult : public SendResult
+       {
+       public:
+               TransactionSendResult();
+               LocalTransactionState getLocalTransactionState();
+               void setLocalTransactionState(LocalTransactionState 
localTransactionState);
+
+       private:
+               LocalTransactionState m_localTransactionState;
+       };
+}
+
+#endif

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/6a45c767/rocketmq-client4cpp/include/TopicFilterType.h
----------------------------------------------------------------------
diff --git a/rocketmq-client4cpp/include/TopicFilterType.h 
b/rocketmq-client4cpp/include/TopicFilterType.h
new file mode 100755
index 0000000..e51ae20
--- /dev/null
+++ b/rocketmq-client4cpp/include/TopicFilterType.h
@@ -0,0 +1,32 @@
+/**
+ * Copyright (C) 2013 kangliqiang ,[email protected]
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#ifndef __RMQ_TOPICFILTERTYPE_H__
+#define __RMQ_TOPICFILTERTYPE_H__
+
+namespace rmq
+{
+       /**
+        * Topic filter type
+        *
+        */
+       enum TopicFilterType
+       {
+               SINGLE_TAG,
+               MULTI_TAG
+       };
+}
+
+#endif

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/6a45c767/rocketmq-client4cpp/rocketmq.mk
----------------------------------------------------------------------
diff --git a/rocketmq-client4cpp/rocketmq.mk b/rocketmq-client4cpp/rocketmq.mk
new file mode 100644
index 0000000..eecc458
--- /dev/null
+++ b/rocketmq-client4cpp/rocketmq.mk
@@ -0,0 +1,6 @@
+ROCKETMQ_PATH := /data/libs/rocketmq
+
+INCLUDE += -I$(ROCKETMQ_PATH)/include 
+INCLUDE_32 += -I$(ROCKETMQ_PATH)/include -march=i686
+LIB_32 += -L$(ROCKETMQ_PATH)/lib32 -lrocketmq -lz -lrt -lpthread
+LIB_64 += -L$(ROCKETMQ_PATH)/lib64 -lrocketmq -lz -lrt -lpthread

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/6a45c767/rocketmq-client4cpp/src/ClientConfig.cpp
----------------------------------------------------------------------
diff --git a/rocketmq-client4cpp/src/ClientConfig.cpp 
b/rocketmq-client4cpp/src/ClientConfig.cpp
new file mode 100755
index 0000000..986d67d
--- /dev/null
+++ b/rocketmq-client4cpp/src/ClientConfig.cpp
@@ -0,0 +1,168 @@
+/**
+ * Copyright (C) 2010-2013 kangliqiang, [email protected]
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#include <stdlib.h>
+#include <sstream>
+
+#include "MQClientException.h"
+#include "SocketUtil.h"
+#include "ClientConfig.h"
+#include "UtilAll.h"
+#include "MixAll.h"
+
+namespace rmq
+{
+
+ClientConfig::ClientConfig()
+{
+    char* addr = getenv(MixAll::NAMESRV_ADDR_ENV.c_str());
+    if (addr)
+    {
+        m_namesrvAddr = addr;
+    }
+    else
+    {
+        m_namesrvAddr = "";
+    }
+
+    m_clientIP = getLocalAddress();
+    m_instanceName = "DEFAULT";
+    m_clientCallbackExecutorThreads = UtilAll::availableProcessors();
+    m_pollNameServerInterval = 1000 * 30;
+    m_heartbeatBrokerInterval = 1000 * 30;
+    m_persistConsumerOffsetInterval = 1000 * 5;
+}
+
+ClientConfig::~ClientConfig()
+{
+}
+
+std::string ClientConfig::buildMQClientId()
+{
+    return m_clientIP + "@" + m_instanceName;
+}
+
+void ClientConfig::changeInstanceNameToPID()
+{
+    if (m_instanceName == "DEFAULT")
+    {
+        m_instanceName = UtilAll::toString(UtilAll::getPid());
+    }
+}
+
+
+void ClientConfig::resetClientConfig(const ClientConfig& cc)
+{
+    m_namesrvAddr = cc.m_namesrvAddr;
+    m_clientIP = cc.m_clientIP;
+    m_instanceName = cc.m_instanceName;
+    m_clientCallbackExecutorThreads = cc.m_clientCallbackExecutorThreads;
+    m_pollNameServerInterval = cc.m_pollNameServerInterval;
+    m_heartbeatBrokerInterval = cc.m_heartbeatBrokerInterval;
+    m_persistConsumerOffsetInterval = cc.m_persistConsumerOffsetInterval;
+}
+
+ClientConfig ClientConfig::cloneClientConfig()
+{
+    return *this;
+}
+
+std::string ClientConfig::getNamesrvAddr()
+{
+    return m_namesrvAddr;
+}
+
+void ClientConfig::setNamesrvAddr(const std::string& namesrvAddr)
+{
+    m_namesrvAddr = namesrvAddr;
+}
+
+std::string ClientConfig::getClientIP()
+{
+    return m_clientIP;
+}
+
+void ClientConfig::setClientIP(const std::string& clientIP)
+{
+    m_clientIP = clientIP;
+}
+
+std::string ClientConfig::getInstanceName()
+{
+    return m_instanceName;
+}
+
+void ClientConfig::setInstanceName(const std::string& instanceName)
+{
+    m_instanceName = instanceName;
+}
+
+int ClientConfig::getClientCallbackExecutorThreads()
+{
+    return m_clientCallbackExecutorThreads;
+}
+
+void ClientConfig::setClientCallbackExecutorThreads(int 
clientCallbackExecutorThreads)
+{
+    m_clientCallbackExecutorThreads = clientCallbackExecutorThreads;
+}
+
+int ClientConfig::getPollNameServerInterval()
+{
+    return m_pollNameServerInterval;
+}
+
+void ClientConfig::setPollNameServerInterval(int pollNameServerInterval)
+{
+    m_pollNameServerInterval = pollNameServerInterval;
+}
+
+int ClientConfig::getHeartbeatBrokerInterval()
+{
+    return m_heartbeatBrokerInterval;
+}
+
+void ClientConfig::setHeartbeatBrokerInterval(int heartbeatBrokerInterval)
+{
+    m_heartbeatBrokerInterval = heartbeatBrokerInterval;
+}
+
+int ClientConfig:: getPersistConsumerOffsetInterval()
+{
+    return m_persistConsumerOffsetInterval;
+}
+
+void ClientConfig::setPersistConsumerOffsetInterval(int 
persistConsumerOffsetInterval)
+{
+    m_persistConsumerOffsetInterval = persistConsumerOffsetInterval;
+}
+
+
+std::string ClientConfig::toString() const
+{
+       std::stringstream ss;
+       ss      << "{namesrvAddr=" << m_namesrvAddr
+               << ",clientIP=" << m_clientIP
+               << ",instanceName=" << m_instanceName
+               << ",clientCallbackExecutorThreads=" << 
m_clientCallbackExecutorThreads
+               << ",pollNameServerInteval=" << m_pollNameServerInterval
+               << ",heartbeatBrokerInterval=" << m_heartbeatBrokerInterval
+               << ",persistConsumerOffsetInterval=" << 
m_persistConsumerOffsetInterval
+               <<"}";
+       return ss.str();
+}
+
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/6a45c767/rocketmq-client4cpp/src/ClientRemotingProcessor.cpp
----------------------------------------------------------------------
diff --git a/rocketmq-client4cpp/src/ClientRemotingProcessor.cpp 
b/rocketmq-client4cpp/src/ClientRemotingProcessor.cpp
new file mode 100755
index 0000000..ae88de5
--- /dev/null
+++ b/rocketmq-client4cpp/src/ClientRemotingProcessor.cpp
@@ -0,0 +1,154 @@
+/**
+* Copyright (C) 2013 kangliqiang ,[email protected]
+*
+* Licensed under the Apache License, Version 2.0 (the "License");
+* you may not use this file except in compliance with the License.
+* You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+#include "ClientRemotingProcessor.h"
+#include "MQProtos.h"
+#include "TcpTransport.h"
+#include "RemotingCommand.h"
+#include "MQClientFactory.h"
+#include "CommandCustomHeader.h"
+#include "ConsumerRunningInfo.h"
+
+
+
+namespace rmq
+{
+
+ClientRemotingProcessor::ClientRemotingProcessor(MQClientFactory* 
pMQClientFactory)
+    : m_pMQClientFactory(pMQClientFactory)
+{
+
+}
+
+RemotingCommand* ClientRemotingProcessor::processRequest(TcpTransport* pTts, 
RemotingCommand* pRequest)
+{
+    int code = pRequest->getCode();
+    switch (code)
+    {
+        case CHECK_TRANSACTION_STATE_VALUE:
+            return checkTransactionState(pTts, pRequest);
+        case NOTIFY_CONSUMER_IDS_CHANGED_VALUE:
+            return notifyConsumerIdsChanged(pTts, pRequest);
+        case RESET_CONSUMER_CLIENT_OFFSET_VALUE:
+            return resetOffset(pTts, pRequest);
+        case GET_CONSUMER_STATUS_FROM_CLIENT_VALUE:
+            return getConsumeStatus(pTts, pRequest);
+        case GET_CONSUMER_RUNNING_INFO_VALUE:
+            return getConsumerRunningInfo(pTts, pRequest);
+        case CONSUME_MESSAGE_DIRECTLY_VALUE:
+            return consumeMessageDirectly(pTts, pRequest);
+        default:
+            break;
+    }
+
+    return NULL;
+}
+
+RemotingCommand* ClientRemotingProcessor::checkTransactionState(TcpTransport* 
pTts, RemotingCommand* pRequest)
+{
+    //TODO
+    return NULL;
+}
+
+RemotingCommand* 
ClientRemotingProcessor::notifyConsumerIdsChanged(TcpTransport* pTts, 
RemotingCommand* pRequest)
+{
+    try
+    {
+        NotifyConsumerIdsChangedRequestHeader* extHeader = 
(NotifyConsumerIdsChangedRequestHeader*)pRequest->getCommandCustomHeader();
+        RMQ_INFO("receive broker's notification[{%s}], the consumer group: 
{%s} changed, rebalance immediately",
+                pTts->getServerAddr().c_str(),
+                extHeader->consumerGroup.c_str());
+        m_pMQClientFactory->rebalanceImmediately();
+    }
+    catch (std::exception& e)
+    {
+        RMQ_ERROR("notifyConsumerIdsChanged exception: %s", e.what());
+    }
+
+    return NULL;
+}
+
+RemotingCommand* ClientRemotingProcessor::resetOffset(TcpTransport* pTts, 
RemotingCommand* pRequest)
+{
+    //TODO
+    return NULL;
+}
+
+
+RemotingCommand* ClientRemotingProcessor::getConsumeStatus(TcpTransport* pTts, 
RemotingCommand* pRequest)
+{
+    //TODO
+    return NULL;
+}
+
+
+RemotingCommand* ClientRemotingProcessor::getConsumerRunningInfo(TcpTransport* 
pTts, RemotingCommand* pRequest)
+{
+       return NULL;
+
+       /*
+    GetConsumerRunningInfoRequestHeader* requestHeader = 
(GetConsumerRunningInfoRequestHeader)pRequest->getCommandCustomHeader();
+       RemotingCommand* pResponse = 
RemotingCommand::createResponseCommand(NULL);
+
+       pResponse = RemotingCommand::createResponseCommand(
+                                       REQUEST_CODE_NOT_SUPPORTED_VALUE, 
"request type not supported", NULL);
+       pResponse->setOpaque(pCmd->getOpaque());
+
+    ConsumerRunningInfo* consumerRunningInfo = 
m_pMQClientFactory->consumerRunningInfo(requestHeader->consumerGroup);
+    if (NULL != consumerRunningInfo) {
+        response.setCode(ResponseCode.SUCCESS);
+        response.setBody(consumerRunningInfo.encode());
+    } else {
+        response.setCode(ResponseCode.SYSTEM_ERROR);
+        response.setRemark(String.format("The Consumer Group <%s> not exist in 
this consumer",
+                requestHeader.getConsumerGroup()));
+    }
+    return pResponse;
+
+       // java
+    final RemotingCommand response = 
RemotingCommand.createResponseCommand(null);
+    final GetConsumerRunningInfoRequestHeader requestHeader =
+            (GetConsumerRunningInfoRequestHeader) request
+                    
.decodeCommandCustomHeader(GetConsumerRunningInfoRequestHeader.class);
+
+    ConsumerRunningInfo consumerRunningInfo =
+            
this.mqClientFactory.consumerRunningInfo(requestHeader.getConsumerGroup());
+    if (null != consumerRunningInfo) {
+        if (requestHeader.isJstackEnable()) {
+            String jstack = UtilAll.jstack();
+            consumerRunningInfo.setJstack(jstack);
+        }
+
+        response.setCode(ResponseCode.SUCCESS);
+        response.setBody(consumerRunningInfo.encode());
+    } else {
+        response.setCode(ResponseCode.SYSTEM_ERROR);
+        response.setRemark(String.format("The Consumer Group <%s> not exist in 
this consumer",
+                requestHeader.getConsumerGroup()));
+    }
+
+    return response;
+    */
+}
+
+
+RemotingCommand* ClientRemotingProcessor::consumeMessageDirectly(TcpTransport* 
pTts, RemotingCommand* pRequest)
+{
+    //TODO
+    return NULL;
+}
+
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/6a45c767/rocketmq-client4cpp/src/ClientRemotingProcessor.h
----------------------------------------------------------------------
diff --git a/rocketmq-client4cpp/src/ClientRemotingProcessor.h 
b/rocketmq-client4cpp/src/ClientRemotingProcessor.h
new file mode 100755
index 0000000..4cd2873
--- /dev/null
+++ b/rocketmq-client4cpp/src/ClientRemotingProcessor.h
@@ -0,0 +1,45 @@
+/**
+* Copyright (C) 2013 kangliqiang ,[email protected]
+*
+* Licensed under the Apache License, Version 2.0 (the "License");
+* you may not use this file except in compliance with the License.
+* You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+#ifndef __CLIENTREMOTINGPROCESSOR_H__
+#define __CLIENTREMOTINGPROCESSOR_H__
+
+#include "TcpRequestProcessor.h"
+
+namespace rmq
+{
+       class MQClientFactory;
+       class RemotingCommand;
+
+       class ClientRemotingProcessor : public TcpRequestProcessor
+       {
+       public:
+           ClientRemotingProcessor(MQClientFactory* pMQClientFactory);
+
+           RemotingCommand* processRequest(TcpTransport* pTts, 
RemotingCommand* pRequest);
+           RemotingCommand* checkTransactionState(TcpTransport* pTts, 
RemotingCommand* pRequest);
+           RemotingCommand* notifyConsumerIdsChanged(TcpTransport* pTts, 
RemotingCommand* pRequest);
+           RemotingCommand* resetOffset(TcpTransport* pTts, RemotingCommand* 
pRequest);
+           RemotingCommand* getConsumeStatus(TcpTransport* pTts, 
RemotingCommand* pRequest);
+           RemotingCommand* getConsumerRunningInfo(TcpTransport* pTts, 
RemotingCommand* pRequest);
+           RemotingCommand* consumeMessageDirectly(TcpTransport* pTts, 
RemotingCommand* pRequest);
+
+       private:
+           MQClientFactory* m_pMQClientFactory;
+       };
+}
+
+#endif

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/6a45c767/rocketmq-client4cpp/src/CommunicationMode.h
----------------------------------------------------------------------
diff --git a/rocketmq-client4cpp/src/CommunicationMode.h 
b/rocketmq-client4cpp/src/CommunicationMode.h
new file mode 100755
index 0000000..43b2941
--- /dev/null
+++ b/rocketmq-client4cpp/src/CommunicationMode.h
@@ -0,0 +1,34 @@
+/**
+ * Copyright (C) 2013 kangliqiang ,[email protected]
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef __COMMUNICATIONMODE_H__
+#define __COMMUNICATIONMODE_H__
+
+namespace rmq
+{
+       /**
+        * Communication Mode
+        *
+        */
+       enum CommunicationMode
+       {
+           SYNC,
+           ASYNC,
+           ONEWAY
+       };
+}
+
+#endif

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/6a45c767/rocketmq-client4cpp/src/FindBrokerResult.h
----------------------------------------------------------------------
diff --git a/rocketmq-client4cpp/src/FindBrokerResult.h 
b/rocketmq-client4cpp/src/FindBrokerResult.h
new file mode 100644
index 0000000..51a9845
--- /dev/null
+++ b/rocketmq-client4cpp/src/FindBrokerResult.h
@@ -0,0 +1,28 @@
+/**
+* Copyright (C) 2013 kangliqiang ,[email protected]
+*
+* Licensed under the Apache License, Version 2.0 (the "License");
+* you may not use this file except in compliance with the License.
+* You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+#ifndef __FINDBROKERRESULT_H__
+#define __FINDBROKERRESULT_H__
+
+namespace rmq
+{
+       typedef struct
+       {
+           std::string brokerAddr;
+           bool slave;
+       } FindBrokerResult;
+}
+
+#endif

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/6a45c767/rocketmq-client4cpp/src/MQAdminImpl.cpp
----------------------------------------------------------------------
diff --git a/rocketmq-client4cpp/src/MQAdminImpl.cpp 
b/rocketmq-client4cpp/src/MQAdminImpl.cpp
new file mode 100755
index 0000000..2a6b597
--- /dev/null
+++ b/rocketmq-client4cpp/src/MQAdminImpl.cpp
@@ -0,0 +1,295 @@
+/**
+* Copyright (C) 2013 kangliqiang ,[email protected]
+*
+* Licensed under the Apache License, Version 2.0 (the "License");
+* you may not use this file except in compliance with the License.
+* You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+#include <list>
+#include "SocketUtil.h"
+#include "MQAdminImpl.h"
+#include "MQClientFactory.h"
+#include "MQClientAPIImpl.h"
+#include "MQClientException.h"
+#include "TopicConfig.h"
+#include "TopicPublishInfo.h"
+#include "MessageId.h"
+#include "MessageDecoder.h"
+
+namespace rmq
+{
+
+
+MQAdminImpl::MQAdminImpl(MQClientFactory* pMQClientFactory)
+{
+    m_pMQClientFactory = pMQClientFactory;
+}
+
+MQAdminImpl::~MQAdminImpl()
+{
+
+}
+
+void MQAdminImpl::createTopic(const std::string& key, const std::string& 
newTopic,
+       int queueNum)
+{
+       return createTopic(key, newTopic, queueNum, 0);
+}
+
+
+void MQAdminImpl::createTopic(const std::string& key, const std::string& 
newTopic,
+       int queueNum, int topicSysFlag)
+{
+    try
+    {
+        MQClientAPIImpl* api = m_pMQClientFactory->getMQClientAPIImpl();
+        TopicRouteDataPtr topicRouteData = 
api->getTopicRouteInfoFromNameServer(key, 1000 * 3);
+
+        std::list<BrokerData> brokerDataList = 
topicRouteData->getBrokerDatas();
+        if (!brokerDataList.empty())
+        {
+            brokerDataList.sort();
+
+            MQClientException exception("", 0, "", 0);
+            bool hasException = false;
+
+            std::list<BrokerData>::iterator it = brokerDataList.begin();
+
+            for (; it != brokerDataList.end(); it++)
+            {
+                std::map<int, std::string>::iterator it1 = 
(*it).brokerAddrs.find(MixAll::MASTER_ID);
+                if (it1 != (*it).brokerAddrs.end())
+                {
+                    std::string addr = it1->second;
+
+                    TopicConfig topicConfig(newTopic);
+                    topicConfig.setReadQueueNums(queueNum);
+                    topicConfig.setWriteQueueNums(queueNum);
+                    topicConfig.setTopicSysFlag(topicSysFlag);
+
+                    try
+                    {
+                        api->createTopic(addr, key, topicConfig, 1000 * 3);
+                    }
+                    catch (MQClientException& e)
+                    {
+                        hasException = true;
+                        exception = e;
+                    }
+                }
+            }
+
+            if (hasException)
+            {
+                throw exception;
+            }
+        }
+        else
+        {
+            THROW_MQEXCEPTION(MQClientException, "Not found broker, maybe key 
is wrong", -1);
+        }
+    }
+    catch (MQClientException e)
+    {
+        THROW_MQEXCEPTION(MQClientException, "create new topic failed", -1);
+    }
+}
+
+std::vector<MessageQueue>* MQAdminImpl::fetchPublishMessageQueues(const 
std::string& topic)
+{
+    try
+    {
+        MQClientAPIImpl* api = m_pMQClientFactory->getMQClientAPIImpl();
+        TopicRouteDataPtr topicRouteData = 
api->getTopicRouteInfoFromNameServer(topic, 1000 * 3);
+
+        if (topicRouteData.ptr() != NULL)
+        {
+            TopicPublishInfoPtr topicPublishInfo =
+                MQClientFactory::topicRouteData2TopicPublishInfo(topic, 
*topicRouteData);
+            if (topicPublishInfo.ptr() != NULL && topicPublishInfo->ok())
+            {
+                std::vector<MessageQueue>* ret = new 
std::vector<MessageQueue>();
+                (*ret) = topicPublishInfo->getMessageQueueList();
+
+                               /*
+                std::vector<MessageQueue>& mqs = ;
+                std::vector<MessageQueue>::iterator it = mqs.begin();
+                for (; it != mqs.end(); it++)
+                {
+                    ret->push_back(*it);
+                }
+                */
+
+                return ret;
+            }
+        }
+    }
+    catch (MQClientException e)
+    {
+        THROW_MQEXCEPTION(MQClientException, "Can not find Message Queue for 
this topic" + topic, -1);
+    }
+
+    THROW_MQEXCEPTION(MQClientException, "Unknow why, Can not find Message 
Queue for this topic, " + topic, -1);
+}
+
+std::set<MessageQueue>* MQAdminImpl::fetchSubscribeMessageQueues(const 
std::string& topic)
+{
+    try
+    {
+        TopicRouteDataPtr topicRouteData =
+            
m_pMQClientFactory->getMQClientAPIImpl()->getTopicRouteInfoFromNameServer(topic,
 1000 * 3);
+        if (topicRouteData.ptr() != NULL)
+        {
+            std::set<MessageQueue>* mqList =
+                MQClientFactory::topicRouteData2TopicSubscribeInfo(topic, 
*topicRouteData);
+            if (!mqList->empty())
+            {
+                return mqList;
+            }
+            else
+            {
+                THROW_MQEXCEPTION(MQClientException, "Can not find Message 
Queue for this topic" + topic, -1);
+            }
+        }
+    }
+    catch (MQClientException e)
+    {
+        THROW_MQEXCEPTION(MQClientException, "Can not find Message Queue for 
this topic" + topic, -1);
+    }
+
+    THROW_MQEXCEPTION(MQClientException, "Unknow why, Can not find Message 
Queue for this topic: " + topic, -1);
+}
+
+long long MQAdminImpl::searchOffset(const MessageQueue& mq, long long 
timestamp)
+{
+    std::string brokerAddr = 
m_pMQClientFactory->findBrokerAddressInPublish(mq.getBrokerName());
+    if (brokerAddr.empty())
+    {
+        m_pMQClientFactory->updateTopicRouteInfoFromNameServer(mq.getTopic());
+        brokerAddr = 
m_pMQClientFactory->findBrokerAddressInPublish(mq.getBrokerName());
+    }
+
+    if (!brokerAddr.empty())
+    {
+        try
+        {
+            return 
m_pMQClientFactory->getMQClientAPIImpl()->searchOffset(brokerAddr, 
mq.getTopic(),
+                    mq.getQueueId(), timestamp, 1000 * 3);
+        }
+        catch (MQClientException e)
+        {
+            THROW_MQEXCEPTION(MQClientException, "Invoke Broker[" + brokerAddr 
+ "] exception", -1);
+        }
+    }
+    THROW_MQEXCEPTION(MQClientException, "The broker[" + mq.getBrokerName() + 
"] not exist", -1);
+}
+
+long long MQAdminImpl::maxOffset(const MessageQueue& mq)
+{
+    std::string brokerAddr = 
m_pMQClientFactory->findBrokerAddressInPublish(mq.getBrokerName());
+    if (brokerAddr.empty())
+    {
+        m_pMQClientFactory->updateTopicRouteInfoFromNameServer(mq.getTopic());
+        brokerAddr = 
m_pMQClientFactory->findBrokerAddressInPublish(mq.getBrokerName());
+    }
+
+    if (!brokerAddr.empty())
+    {
+        try
+        {
+            return 
m_pMQClientFactory->getMQClientAPIImpl()->getMaxOffset(brokerAddr, 
mq.getTopic(),
+                    mq.getQueueId(), 1000 * 3);
+        }
+        catch (MQClientException e)
+        {
+            THROW_MQEXCEPTION(MQClientException, "Invoke Broker[" + brokerAddr 
+ "] exception", -1);
+        }
+    }
+    THROW_MQEXCEPTION(MQClientException, "The broker[" + mq.getBrokerName() + 
"] not exist", -1);
+}
+
+long long MQAdminImpl::minOffset(const MessageQueue& mq)
+{
+    std::string brokerAddr = 
m_pMQClientFactory->findBrokerAddressInPublish(mq.getBrokerName());
+    if (brokerAddr.empty())
+    {
+        m_pMQClientFactory->updateTopicRouteInfoFromNameServer(mq.getTopic());
+        brokerAddr = 
m_pMQClientFactory->findBrokerAddressInPublish(mq.getBrokerName());
+    }
+
+    if (!brokerAddr.empty())
+    {
+        try
+        {
+            return 
m_pMQClientFactory->getMQClientAPIImpl()->getMinOffset(brokerAddr, 
mq.getTopic(),
+                    mq.getQueueId(), 1000 * 3);
+        }
+        catch (MQClientException e)
+        {
+            THROW_MQEXCEPTION(MQClientException, "Invoke Broker[" + brokerAddr 
+ "] exception", -1);
+        }
+    }
+
+    THROW_MQEXCEPTION(MQClientException, "The broker[" + mq.getBrokerName() + 
"] not exist", -1);
+}
+
+long long MQAdminImpl::earliestMsgStoreTime(const MessageQueue& mq)
+{
+    std::string brokerAddr = 
m_pMQClientFactory->findBrokerAddressInPublish(mq.getBrokerName());
+    if (brokerAddr.empty())
+    {
+        m_pMQClientFactory->updateTopicRouteInfoFromNameServer(mq.getTopic());
+        brokerAddr = 
m_pMQClientFactory->findBrokerAddressInPublish(mq.getBrokerName());
+    }
+
+    if (!brokerAddr.empty())
+    {
+        try
+        {
+            return 
m_pMQClientFactory->getMQClientAPIImpl()->getEarliestMsgStoretime(brokerAddr,
+                    mq.getTopic(), mq.getQueueId(), 1000 * 3);
+        }
+        catch (MQClientException e)
+        {
+            THROW_MQEXCEPTION(MQClientException, "Invoke Broker[" + brokerAddr 
+ "] exception", -1);
+        }
+    }
+
+    THROW_MQEXCEPTION(MQClientException, "The broker[" + mq.getBrokerName() + 
"] not exist", -1);
+}
+
+MessageExt* MQAdminImpl::viewMessage(const std::string& msgId)
+{
+    try
+    {
+        MessageId messageId = MessageDecoder::decodeMessageId(msgId);
+        return m_pMQClientFactory->getMQClientAPIImpl()->viewMessage(
+                   socketAddress2String(messageId.getAddress()), 
messageId.getOffset(), 1000 * 3);
+    }
+    catch (UnknownHostException e)
+    {
+        THROW_MQEXCEPTION(MQClientException, "message id illegal", -1);
+    }
+}
+
+QueryResult MQAdminImpl::queryMessage(const std::string& topic,
+                                      const std::string& key,
+                                      int maxNum, long long begin, long long 
end)
+{
+    //TODO
+    std::list<MessageExt*> messageList;
+    QueryResult result(0, messageList);
+
+    return result;
+}
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/6a45c767/rocketmq-client4cpp/src/MQAdminImpl.h
----------------------------------------------------------------------
diff --git a/rocketmq-client4cpp/src/MQAdminImpl.h 
b/rocketmq-client4cpp/src/MQAdminImpl.h
new file mode 100755
index 0000000..907d61e
--- /dev/null
+++ b/rocketmq-client4cpp/src/MQAdminImpl.h
@@ -0,0 +1,63 @@
+/**
+* Copyright (C) 2013 kangliqiang ,[email protected]
+*
+* Licensed under the Apache License, Version 2.0 (the "License");
+* you may not use this file except in compliance with the License.
+* You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+#ifndef __MQADMINIMPL_H__
+#define __MQADMINIMPL_H__
+
+#include <string>
+#include <list>
+#include <set>
+#include <vector>
+
+#include "MessageExt.h"
+#include "QueryResult.h"
+
+namespace rmq
+{
+    class MQClientFactory;
+    class MessageQueue;
+
+    class MQAdminImpl
+    {
+    public:
+        MQAdminImpl(MQClientFactory* pMQClientFactory);
+        ~MQAdminImpl();
+
+        void createTopic(const std::string& key, const std::string& newTopic, 
int queueNum);
+               void createTopic(const std::string& key, const std::string& 
newTopic, int queueNum, int topicSysFlag);
+
+        std::vector<MessageQueue>* fetchPublishMessageQueues(const 
std::string& topic);
+        std::set<MessageQueue>* fetchSubscribeMessageQueues(const std::string& 
topic);
+        long long searchOffset(const MessageQueue& mq, long long timestamp);
+        long long maxOffset(const MessageQueue& mq);
+        long long minOffset(const MessageQueue& mq);
+
+        long long earliestMsgStoreTime(const MessageQueue& mq);
+
+        MessageExt* viewMessage(const std::string& msgId);
+
+        QueryResult queryMessage(const std::string& topic,
+                                 const std::string& key,
+                                 int maxNum,
+                                 long long begin,
+                                 long long end);
+
+    private:
+        MQClientFactory* m_pMQClientFactory;
+    };
+}
+
+#endif

Reply via email to