http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/6a45c767/rocketmq-client4cpp/src/kpr/Mutex.cpp
----------------------------------------------------------------------
diff --git a/rocketmq-client4cpp/src/kpr/Mutex.cpp 
b/rocketmq-client4cpp/src/kpr/Mutex.cpp
new file mode 100755
index 0000000..f98282c
--- /dev/null
+++ b/rocketmq-client4cpp/src/kpr/Mutex.cpp
@@ -0,0 +1,296 @@
+/**
+ * Copyright (C) 2013 kangliqiang ,[email protected]
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#include "Mutex.h"
+
+#include <pthread.h>
+#include <unistd.h>
+#include <stdio.h>
+#include <errno.h>
+#include <time.h>
+
+
+namespace kpr
+{
+Mutex::Mutex()
+{
+    ::pthread_mutex_init(&m_mutex,  NULL);
+}
+
+Mutex::~Mutex()
+{
+    ::pthread_mutex_destroy(&m_mutex);
+}
+
+void Mutex::Lock() const
+{
+    ::pthread_mutex_lock(&m_mutex);
+}
+
+bool Mutex::TryLock() const
+{
+    int ret = ::pthread_mutex_trylock(&m_mutex);
+    return (ret == 0);
+}
+
+bool Mutex::TryLock(int timeout) const
+{
+       struct timespec ts;
+       clock_gettime(CLOCK_REALTIME, &ts);
+       ts.tv_sec += (timeout/1000);
+       ts.tv_nsec += (timeout%1000) * 1000 * 1000;
+
+    int ret = ::pthread_mutex_timedlock(&m_mutex, &ts);
+    return (ret == 0);
+}
+
+
+void Mutex::Unlock() const
+{
+    ::pthread_mutex_unlock(&m_mutex);
+}
+
+//***********
+//RWMutex
+//***************
+RWMutex::RWMutex()
+{
+    ::pthread_rwlock_init(&m_mutex,  NULL);
+}
+
+RWMutex::~RWMutex()
+{
+    ::pthread_rwlock_destroy(&m_mutex);
+}
+
+void RWMutex::ReadLock() const
+{
+    ::pthread_rwlock_rdlock(&m_mutex);
+}
+
+void RWMutex::WriteLock() const
+{
+    ::pthread_rwlock_wrlock(&m_mutex);
+}
+
+bool RWMutex::TryReadLock() const
+{
+    int ret = ::pthread_rwlock_tryrdlock(&m_mutex);
+    return (ret == 0);
+}
+
+bool RWMutex::TryReadLock(int timeout) const
+{
+       struct timespec ts;
+       clock_gettime(CLOCK_REALTIME, &ts);
+       ts.tv_sec += (timeout/1000);
+       ts.tv_nsec += (timeout%1000) * 1000 * 1000;
+
+    int ret = ::pthread_rwlock_timedrdlock(&m_mutex, &ts);
+    return (ret == 0);
+}
+
+bool RWMutex::TryWriteLock() const
+{
+    int ret = ::pthread_rwlock_trywrlock(&m_mutex);
+    return (ret == 0);
+}
+
+bool RWMutex::TryWriteLock(int timeout) const
+{
+       struct timespec ts;
+       clock_gettime(CLOCK_REALTIME, &ts);
+       ts.tv_sec += (timeout/1000);
+       ts.tv_nsec += (timeout%1000) * 1000 * 1000;
+
+    int ret = ::pthread_rwlock_timedwrlock(&m_mutex, &ts);
+    return (ret == 0);
+}
+
+
+void RWMutex::Unlock() const
+{
+    ::pthread_rwlock_unlock(&m_mutex);
+}
+
+
+//***********
+//RecursiveMutex
+//***************
+RecursiveMutex::RecursiveMutex()
+    : m_count(0),
+      m_owner(ThreadId())
+{
+    ::pthread_mutex_init(&m_mutex,  NULL);
+}
+
+RecursiveMutex::~RecursiveMutex()
+{
+    ::pthread_mutex_destroy(&m_mutex);
+}
+
+bool RecursiveMutex::Lock()const
+{
+    return ((RecursiveMutex*)this)->lock(1);
+}
+
+bool RecursiveMutex::Unlock()const
+{
+    return ((RecursiveMutex*)this)->unlock();
+}
+
+bool RecursiveMutex::TryLock()const
+{
+    return ((RecursiveMutex*)this)->tryLock();
+}
+
+ThreadId RecursiveMutex::GetOwner()const
+{
+    m_internal.Lock();
+    ThreadId id;
+    if (m_count > 0)
+    {
+        id = m_owner;
+    }
+    m_internal.Unlock();
+
+    return id;
+}
+
+bool RecursiveMutex::lock(int count)
+{
+    bool rc = false;
+    bool obtained = false;
+
+    while (!obtained)
+    {
+        m_internal.Lock();
+
+        if (m_count == 0)
+        {
+            m_count = count;
+            m_owner = ThreadId::GetCurrentThreadId();
+            obtained = true;
+            rc = true;
+
+            try
+            {
+                ::pthread_mutex_lock(&m_mutex);
+            }
+            catch (...)
+            {
+                try
+                {
+                    m_internal.Unlock();
+                }
+                catch (...)
+                {
+                }
+                throw;
+            }
+        }
+        else if (m_owner == ThreadId::GetCurrentThreadId())
+        {
+            m_count += count;
+            obtained = true;
+        }
+
+        m_internal.Unlock();
+
+        if (!obtained)
+        {
+            ::pthread_mutex_lock(&m_mutex);
+            ::pthread_mutex_unlock(&m_mutex);
+        }
+    }
+
+    return rc;
+}
+
+bool RecursiveMutex::tryLock()
+{
+    bool obtained = false;
+
+    m_internal.Lock();
+
+    if (m_count == 0)
+    {
+        m_count = 1;
+        m_owner = ThreadId::GetCurrentThreadId();
+        obtained = true;
+
+        try
+        {
+            ::pthread_mutex_lock(&m_mutex);
+        }
+        catch (...)
+        {
+            try
+            {
+                m_internal.Unlock();
+            }
+            catch (...)
+            {
+            }
+            throw;
+        }
+    }
+    else if (m_owner == ThreadId::GetCurrentThreadId())
+    {
+        ++m_count;
+        obtained = true;
+    }
+
+    m_internal.Unlock();
+
+    return obtained;
+}
+
+bool RecursiveMutex::unlock()
+{
+    bool rc;
+    m_internal.Lock();
+
+    if (--m_count == 0)
+    {
+        m_owner = ThreadId();
+
+        ::pthread_mutex_unlock(&m_mutex);
+
+        rc = true;
+    }
+    else
+    {
+        rc = false;
+    }
+
+    m_internal.Unlock();
+
+    return rc;
+}
+
+unsigned int RecursiveMutex::reset4Condvar()
+{
+    m_internal.Lock();
+
+    unsigned int count = m_count;
+    m_count = 0;
+    m_owner = ThreadId();
+
+    m_internal.Unlock();
+
+    return count;
+}
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/6a45c767/rocketmq-client4cpp/src/kpr/Mutex.h
----------------------------------------------------------------------
diff --git a/rocketmq-client4cpp/src/kpr/Mutex.h 
b/rocketmq-client4cpp/src/kpr/Mutex.h
new file mode 100755
index 0000000..fc3498f
--- /dev/null
+++ b/rocketmq-client4cpp/src/kpr/Mutex.h
@@ -0,0 +1,107 @@
+/**
+ * Copyright (C) 2013 kangliqiang ,[email protected]
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#ifndef __KPR_MUTEX_H__
+#define __KPR_MUTEX_H__
+
+#include "KPRTypes.h"
+#include <errno.h>
+
+namespace kpr
+{
+class Mutex
+{
+public:
+    Mutex();
+    ~Mutex();
+
+    void Lock()const;
+    void Unlock()const;
+    bool TryLock()const;
+       bool TryLock(int timeout) const;
+
+    ThreadId GetOwner()const;
+
+private:
+    Mutex(const Mutex&);
+    const Mutex& operator=(const Mutex&);
+
+    mutable pthread_mutex_t m_mutex;
+    friend class Condition;
+};
+
+class RWMutex
+{
+public:
+    RWMutex();
+    ~RWMutex();
+
+    void ReadLock()const;
+    void WriteLock()const;
+    bool TryReadLock()const;
+       bool TryReadLock(int timeout) const;
+    bool TryWriteLock()const;
+       bool TryWriteLock(int timeout)const;
+    void Unlock()const;
+
+    ThreadId GetOwner()const;
+
+private:
+    RWMutex(const RWMutex&);
+    const RWMutex& operator=(const RWMutex&);
+
+    mutable pthread_rwlock_t m_mutex;
+    friend class Condition;
+};
+
+class RecursiveMutex
+{
+public:
+    RecursiveMutex();
+    ~RecursiveMutex();
+
+    bool Lock()const;
+    bool Unlock()const;
+    bool TryLock()const;
+
+    ThreadId GetOwner()const;
+
+    unsigned int GetCount()const
+    {
+        return m_count;
+    }
+
+private:
+    RecursiveMutex(const RecursiveMutex&);
+
+    const RecursiveMutex& operator=(const RecursiveMutex&);
+
+    bool  lock(int count);
+    bool  tryLock();
+    bool unlock();
+
+    unsigned int reset4Condvar();
+
+private:
+    pthread_mutex_t m_mutex;
+    Mutex m_internal;
+    mutable unsigned int m_count;
+    mutable ThreadId m_owner;
+
+    friend class Condition;
+    friend class ConditionHelper;
+};
+}
+#endif

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/6a45c767/rocketmq-client4cpp/src/kpr/RefHandle.h
----------------------------------------------------------------------
diff --git a/rocketmq-client4cpp/src/kpr/RefHandle.h 
b/rocketmq-client4cpp/src/kpr/RefHandle.h
new file mode 100644
index 0000000..fd7d741
--- /dev/null
+++ b/rocketmq-client4cpp/src/kpr/RefHandle.h
@@ -0,0 +1,328 @@
+/**
+ * Copyright (C) 2013 kangliqiang ,[email protected]
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#ifndef __KPR_REFHANDLET_H__
+#define __KPR_REFHANDLET_H__
+
+#include "KPRTypes.h"
+#include "AtomicValue.h"
+#include "Exception.h"
+
+namespace kpr
+{
+
+class RefCount
+{
+public:
+       RefCount& operator=(const RefCount&)
+       {
+       return *this;
+       }
+
+       void incRef()
+       {
+       m_refCount++;
+       }
+
+       void decRef()
+       {
+           if (--m_refCount == 0 && !m_noDelete)
+           {
+               m_noDelete = true;
+               delete this;
+           }
+       }
+
+       int getRef() const
+       {
+               return m_refCount.get();
+       }
+
+       void setNoDelete(bool b)
+       {
+               m_noDelete = b;
+       }
+
+protected:
+       RefCount()
+       : m_refCount(0), m_noDelete(false)
+       {
+       }
+
+       RefCount(const RefCount&)
+           : m_refCount(0), m_noDelete(false)
+       {
+       }
+
+       virtual ~RefCount()
+       {
+       }
+
+protected:
+       AtomicInteger m_refCount;
+       bool            m_noDelete;
+};
+
+
+
+template <class T>
+class RefHandleT
+{
+public:
+    RefHandleT(T* p = 0)
+    {
+               m_ptr = p;
+
+               if (m_ptr)
+               {
+                       m_ptr->incRef();
+               }
+    }
+
+       template<typename Y>
+    RefHandleT(const RefHandleT<Y>& v)
+    {
+               m_ptr = v.m_ptr;
+
+               if (m_ptr)
+               {
+                       m_ptr->incRef();
+               }
+    }
+
+       RefHandleT(const RefHandleT& v)
+    {
+               m_ptr = v.m_ptr;
+
+               if (m_ptr)
+               {
+                       m_ptr->incRef();
+               }
+    }
+
+    ~RefHandleT()
+    {
+               if (m_ptr)
+        {
+            m_ptr->decRef();
+        }
+    }
+
+       RefHandleT<T>& operator=(T* p)
+    {
+        if (m_ptr != p)
+        {
+                       if (p)
+            {
+                p->incRef();
+            }
+
+                       T* ptr = m_ptr;
+            m_ptr = p;
+
+            if (ptr)
+            {
+                ptr->decRef();
+            }
+        }
+
+        return *this;
+    }
+
+       template<typename Y>
+       RefHandleT<T>& operator=(const RefHandleT<Y>& v)
+    {
+        if (m_ptr != v.m_ptr)
+        {
+                       if (v.m_ptr)
+            {
+                v.m_ptr->incRef();
+            }
+
+            T* ptr = m_ptr;
+            m_ptr = v.m_ptr;
+
+            if (ptr)
+            {
+                ptr->decRef();
+            }
+        }
+
+        return *this;
+    }
+
+    RefHandleT<T>& operator=(const RefHandleT<T>& v)
+    {
+        if (m_ptr != v.m_ptr)
+        {
+                       if (v.m_ptr)
+            {
+                v.m_ptr->incRef();
+            }
+
+            T* ptr = m_ptr;
+            m_ptr = v.m_ptr;
+
+            if (ptr)
+            {
+                ptr->decRef();
+            }
+        }
+
+        return *this;
+    }
+
+    T* operator->() const
+    {
+               if (!m_ptr)
+               {
+                       THROW_EXCEPTION(RefHandleNullException, "autoptr null 
handle error", -1);
+               }
+
+        return m_ptr;
+    }
+
+    T& operator*() const
+    {
+               if (!m_ptr)
+               {
+                       THROW_EXCEPTION(RefHandleNullException, "autoptr null 
handle error", -1);
+               }
+
+        return *m_ptr;
+    }
+
+    operator T* () const
+    {
+        return m_ptr;
+    }
+
+    T* ptr() const
+    {
+        return m_ptr;
+    }
+
+    T* retn()
+    {
+        T* p = m_ptr;
+        m_ptr = 0;
+
+        return p;
+    }
+
+    bool operator==(const RefHandleT<T>& v) const
+    {
+        return m_ptr == v.m_ptr;
+    }
+
+    bool operator==(T* p) const
+    {
+        return m_ptr == p;
+    }
+
+    bool operator!=(const RefHandleT<T>& v) const
+    {
+        return m_ptr != v.m_ptr;
+    }
+
+    bool operator!=(T* p) const
+    {
+        return m_ptr != p;
+    }
+
+    bool operator!() const
+    {
+        return m_ptr == 0;
+    }
+
+    operator bool() const
+    {
+        return m_ptr != 0;
+    }
+
+    void swap(RefHandleT& other)
+    {
+        std::swap(m_ptr, other._ptr);
+    }
+
+       template<class Y>
+    static RefHandleT dynamicCast(const RefHandleT<Y>& r)
+    {
+        return RefHandleT(dynamic_cast<T*>(r._ptr));
+    }
+
+    template<class Y>
+    static RefHandleT dynamicCast(Y* p)
+    {
+        return RefHandleT(dynamic_cast<T*>(p));
+    }
+
+public:
+    T* m_ptr;
+};
+
+
+template<typename T, typename U>
+inline bool operator==(const RefHandleT<T>& lhs, const RefHandleT<U>& rhs)
+{
+    T* l = lhs.ptr();
+    U* r = rhs.ptr();
+    if(l && r)
+    {
+        return *l == *r;
+    }
+    else
+    {
+        return !l && !r;
+    }
+}
+
+
+template<typename T, typename U>
+inline bool operator!=(const RefHandleT<T>& lhs, const RefHandleT<U>& rhs)
+{
+    T* l = lhs.ptr();
+    U* r = rhs.ptr();
+    if(l && r)
+    {
+        return *l != *r;
+    }
+    else
+    {
+        return l || r;
+    }
+}
+
+
+template<typename T, typename U>
+inline bool operator<(const RefHandleT<T>& lhs, const RefHandleT<U>& rhs)
+{
+    T* l = lhs.ptr();
+    U* r = rhs.ptr();
+    if(l && r)
+    {
+        return *l < *r;
+    }
+    else
+    {
+        return !l && r;
+    }
+}
+
+}
+
+#define DECLAREVAR(T)   typedef kpr::RefHandleT<T> T ## Ptr;
+
+#endif

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/6a45c767/rocketmq-client4cpp/src/kpr/ScopedLock.h
----------------------------------------------------------------------
diff --git a/rocketmq-client4cpp/src/kpr/ScopedLock.h 
b/rocketmq-client4cpp/src/kpr/ScopedLock.h
new file mode 100755
index 0000000..6ff9dd1
--- /dev/null
+++ b/rocketmq-client4cpp/src/kpr/ScopedLock.h
@@ -0,0 +1,91 @@
+/**
+ * Copyright (C) 2013 kangliqiang ,[email protected]
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#ifndef __KPR_SCOPEDLOCK_H__
+#define __KPR_SCOPEDLOCK_H__
+
+namespace kpr
+{
+template <class T>
+class ScopedLock
+{
+public:
+    ScopedLock(const T& mutex)
+        : m_mutex(mutex)
+    {
+        m_mutex.Lock();
+    }
+
+    ~ScopedLock()
+    {
+        m_mutex.Unlock();
+    }
+
+private:
+    const T& m_mutex;
+};
+
+
+template <class T>
+class ScopedRLock
+{
+public:
+    ScopedRLock(const T& mutex)
+        : m_mutex(mutex)
+    {
+        m_mutex.ReadLock();
+        m_acquired = true;
+    }
+
+    ~ScopedRLock()
+    {
+        if (m_acquired)
+        {
+            m_mutex.Unlock();
+        }
+    }
+
+private:
+    const T& m_mutex;
+    mutable bool m_acquired;
+};
+
+
+template <class T>
+class ScopedWLock
+{
+public:
+    ScopedWLock(const T& mutex)
+        : m_mutex(mutex)
+    {
+        m_mutex.WriteLock();
+        m_acquired = true;
+    }
+
+    ~ScopedWLock()
+    {
+        if (m_acquired)
+        {
+            m_mutex.Unlock();
+        }
+    }
+
+private:
+    const T& m_mutex;
+    mutable bool m_acquired;
+};
+}
+
+#endif

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/6a45c767/rocketmq-client4cpp/src/kpr/Semaphore.cpp
----------------------------------------------------------------------
diff --git a/rocketmq-client4cpp/src/kpr/Semaphore.cpp 
b/rocketmq-client4cpp/src/kpr/Semaphore.cpp
new file mode 100755
index 0000000..59a0eef
--- /dev/null
+++ b/rocketmq-client4cpp/src/kpr/Semaphore.cpp
@@ -0,0 +1,73 @@
+/**
+ * Copyright (C) 2013 kangliqiang ,[email protected]
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "Semaphore.h"
+
+#include <unistd.h>
+#include <sys/time.h>
+#include "KPRUtil.h"
+
+namespace kpr
+{
+Semaphore::Semaphore(long initial_count)
+{
+    sem_init(&m_sem, 0, initial_count);
+}
+
+Semaphore::~Semaphore()
+{
+    sem_destroy(&m_sem);
+}
+
+int Semaphore::GetValue()
+{
+       int value = 0;
+       int rc = sem_getvalue(&m_sem, &value);
+       if (rc < 0)
+       {
+               return rc;
+       }
+       return value;
+}
+
+bool Semaphore::Wait()
+{
+    int rc;
+    rc = sem_wait(&m_sem);
+    return !rc;
+}
+
+bool Semaphore::Wait(long timeout)
+{
+    int rc;
+    if (timeout < 0)
+    {
+        rc = sem_wait(&m_sem);
+    }
+    else
+    {
+        struct timespec abstime = KPRUtil::CalcAbsTime(timeout);
+        rc = sem_timedwait(&m_sem, &abstime);
+    }
+
+    return !rc;
+}
+
+void Semaphore::Release(int count)
+{
+    sem_post(&m_sem);
+}
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/6a45c767/rocketmq-client4cpp/src/kpr/Semaphore.h
----------------------------------------------------------------------
diff --git a/rocketmq-client4cpp/src/kpr/Semaphore.h 
b/rocketmq-client4cpp/src/kpr/Semaphore.h
new file mode 100755
index 0000000..2a1af7f
--- /dev/null
+++ b/rocketmq-client4cpp/src/kpr/Semaphore.h
@@ -0,0 +1,42 @@
+/**
+ * Copyright (C) 2013 kangliqiang ,[email protected]
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#ifndef __KPR_SEMAPHORE_H__
+#define __KPR_SEMAPHORE_H__
+
+#include "KPRTypes.h"
+#include <errno.h>
+
+namespace kpr
+{
+
+class Semaphore
+{
+public:
+    Semaphore(long initial_count = 0);
+    ~Semaphore();
+
+       int GetValue();
+    bool Wait();
+    bool Wait(long timeout);
+
+    void Release(int count = 1);
+
+private:
+    sem_t m_sem;
+};
+}
+
+#endif

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/6a45c767/rocketmq-client4cpp/src/kpr/Thread.cpp
----------------------------------------------------------------------
diff --git a/rocketmq-client4cpp/src/kpr/Thread.cpp 
b/rocketmq-client4cpp/src/kpr/Thread.cpp
new file mode 100755
index 0000000..d80819b
--- /dev/null
+++ b/rocketmq-client4cpp/src/kpr/Thread.cpp
@@ -0,0 +1,191 @@
+/**
+ * Copyright (C) 2013 kangliqiang ,[email protected]
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#include "Thread.h"
+
+#include <string.h>
+#include <stdlib.h>
+#include <stdio.h>
+#include <errno.h>
+#include <assert.h>
+#include <unistd.h>
+#include <sys/types.h>
+#include <signal.h>
+
+#include "ScopedLock.h"
+#include "Exception.h"
+
+//for log
+#include "RocketMQClient.h"
+
+namespace kpr
+{
+kpr::AtomicInteger Thread::s_threadNumber = 0;
+
+void* Thread::ThreadRoute(void* pArg)
+{
+    Thread* tv = ((Thread*)pArg);
+
+    try
+    {
+        tv->Startup();
+    }
+    catch (...)
+    {
+    }
+
+    try
+    {
+        tv->Cleanup();
+    }
+    catch (...)
+    {
+    }
+
+    return 0;
+}
+
+Thread::Thread(const char* name)
+{
+    m_started = false;
+    m_threadId = ThreadId();
+    m_threadNumber = s_threadNumber++;
+
+    SetName(name);
+}
+
+Thread::~Thread()
+{
+    try
+    {
+    }
+    catch (...)
+    {
+    }
+}
+
+void Thread::SetName(const char* name)
+{
+    ScopedLock<Mutex> guard(m_mutex);
+
+    if (name == NULL)
+    {
+        snprintf(m_name, sizeof(m_name), "Thread-%u", m_threadNumber);
+    }
+    else
+    {
+        snprintf(m_name, sizeof(m_name), "%s", name);
+    }
+}
+
+const char* Thread::GetName() const
+{
+    ScopedLock<Mutex> guard(m_mutex);
+    return m_name;
+}
+
+void Thread::Start()
+{
+    ScopedLock<Mutex> guard(m_mutex);
+
+    if (m_started)
+    {
+        return;
+    }
+
+    pthread_attr_t attr;
+    int retcode = 0;
+    retcode = pthread_attr_init(&attr);
+    if (retcode != 0)
+    {
+        THROW_EXCEPTION(SystemCallException, "pthread_attr_init failed!", 
errno)
+    }
+
+    pthread_t id;
+    retcode = pthread_create(&id, &attr, ThreadRoute, (void*)this);
+    if (retcode != 0)
+    {
+        THROW_EXCEPTION(SystemCallException, "pthread_create error", errno)
+    }
+
+    m_threadId = id;
+    pthread_attr_destroy(&attr);
+    m_started = true;
+    RMQ_DEBUG("thread[%s][%ld] start successfully", m_name, (long)id);
+}
+
+void Thread::Run()
+{
+    //TODO support runable
+}
+
+bool Thread::IsAlive() const
+{
+    if (m_started)
+    {
+        int retcode = pthread_kill(m_threadId, 0);
+        return (retcode == ESRCH);
+    }
+
+    return false;
+}
+
+void  Thread::Join()
+{
+    if (m_started)
+    {
+        pthread_join(m_threadId, NULL);
+    }
+}
+
+void Thread::Sleep(long millis, int nanos)
+{
+    assert(millis >= 0 && nanos >= 0 && nanos < 999999);
+    struct timespec tv;
+    tv.tv_sec = millis / 1000;
+    tv.tv_nsec = (millis % 1000) * 1000000 + nanos;
+    nanosleep(&tv, 0);
+}
+
+void Thread::Yield()
+{
+    pthread_yield();
+}
+
+ThreadId Thread::GetId() const
+{
+    ScopedLock<Mutex> guard(m_mutex);
+    return m_threadId;
+}
+
+void Thread::Startup()
+{
+    try
+    {
+       RMQ_INFO("thread[%s] started", GetName());
+        Run();
+    }
+    catch (...)
+    {
+    }
+}
+
+void Thread::Cleanup()
+{
+       RMQ_INFO("thread[%s] end", GetName());
+}
+
+}
+

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/6a45c767/rocketmq-client4cpp/src/kpr/Thread.h
----------------------------------------------------------------------
diff --git a/rocketmq-client4cpp/src/kpr/Thread.h 
b/rocketmq-client4cpp/src/kpr/Thread.h
new file mode 100755
index 0000000..ef2590e
--- /dev/null
+++ b/rocketmq-client4cpp/src/kpr/Thread.h
@@ -0,0 +1,68 @@
+/**
+ * Copyright (C) 2013 kangliqiang ,[email protected]
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#ifndef __KPR_THREAD_H__
+#define __KPR_THREAD_H__
+
+#include "KPRTypes.h"
+#include "RefHandle.h"
+#include "Mutex.h"
+
+#ifdef Yield
+#undef Yield
+#endif
+
+namespace kpr
+{
+class Thread : public virtual kpr::RefCount
+{
+public:
+    Thread(const char* name = NULL);
+    virtual ~Thread();
+
+    virtual void Run();
+    void Start();
+    bool IsAlive() const;
+    void Join();
+    ThreadId   GetId() const;
+
+    void SetName(const char*);
+    const char* GetName() const;
+
+    void Startup();
+    void Cleanup();
+
+    static void  Sleep(long millis, int nano = 0);
+    static void  Yield();
+
+private:
+    Thread(const Thread&);
+    const Thread& operator=(const Thread&);
+    static void* ThreadRoute(void* pArg);
+
+private:
+    ThreadId m_threadId;
+    unsigned int m_threadNumber;
+    char m_name[128];
+    bool m_started;
+    Mutex m_mutex;
+
+    static kpr::AtomicInteger s_threadNumber;
+};
+typedef kpr::RefHandleT<Thread> ThreadPtr;
+
+}
+
+#endif

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/6a45c767/rocketmq-client4cpp/src/kpr/ThreadLocal.cpp
----------------------------------------------------------------------
diff --git a/rocketmq-client4cpp/src/kpr/ThreadLocal.cpp 
b/rocketmq-client4cpp/src/kpr/ThreadLocal.cpp
new file mode 100755
index 0000000..32cba5b
--- /dev/null
+++ b/rocketmq-client4cpp/src/kpr/ThreadLocal.cpp
@@ -0,0 +1,56 @@
+/**
+ * Copyright (C) 2013 kangliqiang ,[email protected]
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#include "ThreadLocal.h"
+
+#include <errno.h>
+
+#include "Exception.h"
+
+namespace kpr
+{
+ThreadLocal::ThreadLocal()
+    : m_Key(0)
+{
+    int retcode = 0;
+
+    retcode = pthread_key_create(&m_Key, 0);
+    if (retcode != 0)
+    {
+        THROW_EXCEPTION(SystemCallException, "pthread_key_create error", 
errno);
+    }
+}
+
+ThreadLocal::~ThreadLocal()
+{
+    pthread_key_delete(m_Key);
+}
+
+void* ThreadLocal::GetValue()
+{
+    void* v;
+    v = pthread_getspecific(m_Key);
+    return v;
+}
+
+void ThreadLocal::SetValue(void* value)
+{
+    int retcode = pthread_setspecific(m_Key, value);
+    if (retcode != 0)
+    {
+        THROW_EXCEPTION(SystemCallException, "pthread_setspecific error", 
errno);
+    }
+}
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/6a45c767/rocketmq-client4cpp/src/kpr/ThreadLocal.h
----------------------------------------------------------------------
diff --git a/rocketmq-client4cpp/src/kpr/ThreadLocal.h 
b/rocketmq-client4cpp/src/kpr/ThreadLocal.h
new file mode 100644
index 0000000..9ec8f43
--- /dev/null
+++ b/rocketmq-client4cpp/src/kpr/ThreadLocal.h
@@ -0,0 +1,37 @@
+/**
+ * Copyright (C) 2013 kangliqiang ,[email protected]
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#ifndef __KPR_THREADLOCAL_H__
+#define __KPR_THREADLOCAL_H__
+
+#include "KPRTypes.h"
+
+namespace kpr
+{
+class ThreadLocal
+{
+public:
+    ThreadLocal();
+    virtual ~ThreadLocal();
+
+    void* GetValue();
+    void SetValue(void* value);
+
+private:
+    ThreadKey   m_Key;
+};
+};
+
+#endif

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/6a45c767/rocketmq-client4cpp/src/kpr/ThreadPool.cpp
----------------------------------------------------------------------
diff --git a/rocketmq-client4cpp/src/kpr/ThreadPool.cpp 
b/rocketmq-client4cpp/src/kpr/ThreadPool.cpp
new file mode 100755
index 0000000..32557a8
--- /dev/null
+++ b/rocketmq-client4cpp/src/kpr/ThreadPool.cpp
@@ -0,0 +1,418 @@
+/**
+ * Copyright (C) 2013 kangliqiang ,[email protected]
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#include "ThreadPool.h"
+
+#include "RocketMQClient.h"
+#include "ScopedLock.h"
+#include "KPRUtil.h"
+
+namespace kpr
+{
+ThreadPoolWorker:: ThreadPoolWorker(ThreadPool* pThreadPool, const char* 
strName)
+    : kpr::Thread(strName),
+      m_pThreadPool(pThreadPool),
+      m_canWork(false),
+      m_isWaiting(false),
+      m_stop(false),
+      m_idleTime(0),
+      m_idle(true)
+{
+
+}
+
+bool ThreadPoolWorker::IsIdle()
+{
+    kpr::ScopedLock<kpr::Monitor> lock(*this);
+    return m_idle;
+}
+
+void ThreadPoolWorker:: SetIdle(bool idle)
+{
+    kpr::ScopedLock<kpr::Monitor> lock(*this);
+    m_idle = idle;
+    m_idleTime = 0;
+}
+
+int ThreadPoolWorker::IdleTime(int idleTime)
+{
+    if (m_idle)
+    {
+        m_idleTime += idleTime;
+    }
+    else
+    {
+        m_idleTime = 0;
+    }
+
+    return m_idleTime;
+}
+
+void ThreadPoolWorker::Run()
+{
+    while (!m_stop)
+    {
+        SetIdle(true);
+        {
+            kpr::ScopedLock<kpr::Monitor> lock(*this);
+            while (!m_canWork)
+            {
+                try
+                {
+                    m_isWaiting = true;
+                    Wait();
+                    m_isWaiting = false;
+                }
+                catch (...)
+                {
+                }
+            }
+
+            m_canWork = false;
+        }
+
+        while (!m_stop)
+        {
+            ThreadPoolWorkPtr request = m_pThreadPool->GetWork(this);
+            if ((ThreadPoolWork*)(NULL) == request)
+            {
+                break;
+            }
+
+            SetIdle(false);
+
+            try
+            {
+               request->Do();
+            }
+            catch(...)
+            {
+               RMQ_ERROR("thead[%s] doWork exception", GetName());
+            }
+
+                       //delete request;
+            request = NULL;
+        }
+
+        if (m_stop || m_pThreadPool->IsDestroy())
+        {
+            break;
+        }
+    }
+
+    m_pThreadPool ->RemoveThread(this);
+    m_pThreadPool = NULL;
+}
+
+void ThreadPoolWorker::WakeUp()
+{
+    SetIdle(false);
+    kpr::ScopedLock<kpr::Monitor> lock(*this);
+    m_canWork = true;
+    Notify();
+}
+
+void ThreadPoolWorker::Stop()
+{
+    kpr::ScopedLock<kpr::Monitor> lock(*this);
+    m_canWork = true;
+    m_stop = true;
+    Notify();
+}
+
+bool ThreadPoolWorker:: IsWaiting()
+{
+    kpr::ScopedLock<kpr::Monitor> lock(*this);
+    return m_isWaiting;
+}
+
+ThreadPool::ThreadPool(const char* name,
+                        int count,
+                        int minCount,
+                        int maxCount,
+                        int step,
+                        int maxIdleTime,
+                        int checkldleThreadsInterval)
+{
+    if (name == NULL)
+    {
+        snprintf(m_name, sizeof(m_name), "ThreadPool");
+    }
+    else
+    {
+        snprintf(m_name, sizeof(m_name), "%s", name);
+    }
+
+    m_destroy = false;
+    m_minCount = minCount;
+    m_maxCount = maxCount;
+    m_maxIdleTime = maxIdleTime;
+    m_count = 0;
+    m_step = step;
+    m_index = 0;
+
+    m_lastRemoveIdleThreadsTime = KPRUtil::GetCurrentTimeMillis();
+
+    if (m_minCount <= 0)
+    {
+        m_minCount = MIN_THREAD_COUNT;
+    }
+
+    if (m_maxCount < 0)
+    {
+        m_maxCount = MAX_THREAD_COUNT;
+    }
+
+    if (m_maxIdleTime < 0)
+    {
+        m_maxIdleTime = MAX_IDLE_THREAD_TIME;
+    }
+
+    if (m_maxCount != 0 && m_maxCount < m_minCount)
+    {
+        m_minCount = MIN_THREAD_COUNT;
+    }
+
+    if ((m_maxCount != 0 && count > m_maxCount) || count < m_minCount)
+    {
+        count = m_minCount;
+    }
+
+    if (checkldleThreadsInterval < 0)
+    {
+        checkldleThreadsInterval = CHECK_IDLE_THREADS_INTERVAL;
+    }
+
+    AddThreads(count);
+
+       char manager_name[32];
+       snprintf(manager_name, sizeof(manager_name), "%s-manager", m_name);
+    m_manager = new ThreadPoolManage(manager_name, this, 
checkldleThreadsInterval);
+    m_manager->Start();
+}
+
+ThreadPool::~ThreadPool()
+{
+    Destroy();
+}
+
+void ThreadPool::AddThreads(int count)
+{
+    char threadName[256];
+
+    for (int i = 0; i < count; ++i)
+    {
+        snprintf(threadName, sizeof(threadName), "%s-Worker%d", m_name, 
m_index);
+
+        try
+        {
+            ThreadPoolWorkerPtr worker = new ThreadPoolWorker(this, 
threadName);
+            worker->Start();
+
+            m_workers.push_back(worker);
+            while (!worker->IsWaiting())
+            {
+                kpr::Thread::Sleep(0, 100000);
+            }
+
+            m_index++;
+            m_count++;
+        }
+        catch (...)
+        {
+                       RMQ_ERROR("ThreadPool thead[%s] new exception", 
threadName);
+        }
+    }
+}
+
+void ThreadPool::Destroy()
+{
+    std::list<ThreadPoolWorkerPtr> workers;
+    {
+        kpr::ScopedLock<kpr::Monitor> lock(*this);
+        if (m_destroy)
+        {
+            return;
+        }
+
+        m_destroy = true;
+
+        std::list<ThreadPoolWorkerPtr>::iterator iter;
+        for (iter = m_workers.begin(); iter != m_workers.end(); iter++)
+        {
+            workers.push_back(*iter);
+            (*iter)->Stop();
+        }
+    }
+
+    m_manager->Stop();
+    m_manager->Join();
+
+    std::list<ThreadPoolWorkerPtr>::iterator itThread;
+    for (itThread = workers.begin(); itThread != workers.end(); itThread++)
+    {
+        (*itThread)->Join();
+    }
+    m_works.clear();
+}
+
+int ThreadPool::AddWork(ThreadPoolWorkPtr pWork)
+{
+    kpr::ScopedLock<kpr::Monitor> lock(*this);
+    if (m_destroy)
+    {
+        return -1;
+    }
+
+    m_works.push_back(pWork);
+
+    if (!WakeOneThread())
+    {
+        if (0 == m_maxCount || m_count < m_maxCount)
+        {
+            int step = m_step;
+
+            if (0 < m_maxCount && m_count + m_step > m_maxCount)
+            {
+                step = m_maxCount - m_count;
+            }
+
+            AddThreads(step);
+            WakeOneThread();
+        }
+    }
+
+    return 0;
+}
+
+ThreadPoolWorkPtr ThreadPool::GetWork(ThreadPoolWorker* pWorker)
+{
+    kpr::ScopedLock<kpr::Monitor> lock(*this);
+    ThreadPoolWorkPtr result = NULL;
+
+    if (!m_destroy && !m_works.empty())
+    {
+        result = m_works.front();
+        m_works.pop_front();
+    }
+
+    return result;
+}
+
+bool ThreadPool::IsDestroy()
+{
+    kpr::ScopedLock<kpr::Monitor> lock(*this);
+    return m_destroy;
+}
+
+void ThreadPool::RemoveThread(ThreadPoolWorker* workerThread)
+{
+    kpr::ScopedLock<kpr::Monitor> lock(*this);
+
+    std::list<ThreadPoolWorkerPtr>::iterator it = m_workers.begin();
+
+    for (; it != m_workers.end(); it++)
+    {
+        if ((*it) == workerThread)
+        {
+            m_workers.erase(it);
+            m_count--;
+            break;
+        }
+    }
+}
+
+void ThreadPool::RemoveIdleThreads()
+{
+    kpr::ScopedLock<kpr::Monitor> lock(*this);
+
+    if (m_maxIdleTime == 0)
+    {
+        return;
+    }
+
+    unsigned long long time = KPRUtil::GetCurrentTimeMillis();
+    int interval = (int)(time - m_lastRemoveIdleThreadsTime);
+    m_lastRemoveIdleThreadsTime = time;
+
+    std::list<ThreadPoolWorkerPtr>::iterator it = m_workers.begin();
+    int size = (int)m_workers.size();
+    while (size > m_minCount && it != m_workers.end())
+    {
+        if ((*it)->IdleTime(interval) > m_maxIdleTime)
+        {
+            (*it)->Stop();
+            size--;
+        }
+
+        it++;
+    }
+}
+
+bool ThreadPool::WakeOneThread()
+{
+    std::list<ThreadPoolWorkerPtr>::iterator it = m_workers.begin();
+    for (; it != m_workers.end(); it++)
+    {
+        if ((*it)->IsIdle())
+        {
+            (*it)->WakeUp();
+            return true;
+        }
+    }
+
+    return false;
+}
+
+ThreadPoolManage::ThreadPoolManage(const char* name, ThreadPool* pThreadPool, 
int checkldleThreadsInterval)
+    : kpr::Thread(name),
+      m_pThreadPool(pThreadPool),
+      m_stop(false),
+      m_checkIdleThreadsInterval(checkldleThreadsInterval)
+{
+}
+
+ThreadPoolManage::~ThreadPoolManage()
+{
+}
+
+void ThreadPoolManage::Stop()
+{
+    kpr::ScopedLock<kpr::Monitor> lock(*this);
+    m_stop = true;
+    Notify();
+}
+
+void ThreadPoolManage::Run()
+{
+    while (!m_stop)
+    {
+        {
+            kpr::ScopedLock<kpr::Monitor> lock(*this);
+            if (!m_stop)
+            {
+                Wait(m_checkIdleThreadsInterval);
+            }
+
+            if (m_stop)
+            {
+                break;
+            }
+        }
+
+        m_pThreadPool->RemoveIdleThreads();
+    }
+}
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/6a45c767/rocketmq-client4cpp/src/kpr/ThreadPool.h
----------------------------------------------------------------------
diff --git a/rocketmq-client4cpp/src/kpr/ThreadPool.h 
b/rocketmq-client4cpp/src/kpr/ThreadPool.h
new file mode 100755
index 0000000..2c7e3ff
--- /dev/null
+++ b/rocketmq-client4cpp/src/kpr/ThreadPool.h
@@ -0,0 +1,124 @@
+/**
+ * Copyright (C) 2013 kangliqiang ,[email protected]
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#ifndef __KPR_THREADPOOL_H__
+#define __KPR_THREADPOOL_H__
+
+#include<time.h>
+#include <assert.h>
+#include <list>
+#include "Mutex.h"
+#include "Thread.h"
+#include "Monitor.h"
+
+#include "ThreadPoolWork.h"
+
+namespace kpr
+{
+const int MAX_THREAD_COUNT = 300;
+const int MIN_THREAD_COUNT = 1;
+//const int MAX_IDLE_THREAD_TIME = 600000;
+const int MAX_IDLE_THREAD_TIME = 0;
+const int THREAD_STEP = 10;
+const int CHECK_IDLE_THREADS_INTERVAL = 30000;
+
+class ThreadPool;
+class ThreadPoolWorker : public kpr::Thread, public kpr::Monitor
+{
+public:
+    ThreadPoolWorker(ThreadPool* pThreadPool, const char* strName);
+
+    virtual void Run();
+    void WakeUp();
+    void Stop();
+    bool IsWaiting();
+    bool IsIdle();
+    void SetIdle(bool idle);
+    int IdleTime(int idleTime);
+
+private:
+    ThreadPool* m_pThreadPool;
+    bool m_canWork;
+    bool m_isWaiting;
+    bool m_stop;
+    int m_idleTime;
+    bool m_idle;
+};
+typedef kpr::RefHandleT<ThreadPoolWorker> ThreadPoolWorkerPtr;
+
+class ThreadPoolManage : public kpr::Thread, public kpr::Monitor
+{
+public:
+    ThreadPoolManage(const char* name, ThreadPool* pThreadPool, int 
nCheckldleThreadsInterval);
+
+    ~ThreadPoolManage();
+    virtual void Run();
+    void Stop();
+
+private:
+    ThreadPool* m_pThreadPool;
+    bool m_stop;
+    int m_checkIdleThreadsInterval;
+};
+typedef kpr::RefHandleT<ThreadPoolManage> ThreadPoolManagePtr;
+
+
+class ThreadPool : public kpr::RefCount, public kpr::Monitor
+{
+public:
+    ThreadPool(const char* name,
+               int initCount,
+               int minCount,
+               int maxCount,
+               int step = THREAD_STEP,
+               int maxIdleTime = MAX_IDLE_THREAD_TIME,
+               int checkldleThreadsInterval = CHECK_IDLE_THREADS_INTERVAL);
+
+    ~ThreadPool();
+    void Destroy();
+
+    int AddWork(ThreadPoolWorkPtr pWork);
+    ThreadPoolWorkPtr GetWork(ThreadPoolWorker* pWorker);
+
+    void RemoveIdleThreads();
+    void RemoveThread(ThreadPoolWorker* pWorker);
+
+    bool WakeOneThread();
+    bool IsDestroy();
+
+private:
+    void AddThreads(int count);
+
+private:
+    bool m_destroy;
+    int m_minCount;
+    int m_maxCount;
+    int m_maxIdleTime;
+    int m_count;
+    int m_step;
+
+    char m_name[128];
+    unsigned int m_index;
+    unsigned long long m_lastRemoveIdleThreadsTime;
+
+    ThreadPoolManagePtr m_manager;
+    std::list<ThreadPoolWorkPtr> m_works;
+    std::list<ThreadPoolWorkerPtr> m_workers;
+};
+
+typedef kpr::RefHandleT<ThreadPool> ThreadPoolPtr;
+
+}
+#endif

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/6a45c767/rocketmq-client4cpp/src/kpr/ThreadPoolWork.h
----------------------------------------------------------------------
diff --git a/rocketmq-client4cpp/src/kpr/ThreadPoolWork.h 
b/rocketmq-client4cpp/src/kpr/ThreadPoolWork.h
new file mode 100644
index 0000000..30dfe6c
--- /dev/null
+++ b/rocketmq-client4cpp/src/kpr/ThreadPoolWork.h
@@ -0,0 +1,34 @@
+/**
+* Copyright (C) 2013 kangliqiang ,[email protected]
+*
+* Licensed under the Apache License, Version 2.0 (the "License");
+* you may not use this file except in compliance with the License.
+* You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+#ifndef __THREADPOOLWORK_H__
+#define __THREADPOOLWORK_H__
+
+#include "RefHandle.h"
+
+namespace kpr
+{
+
+class ThreadPoolWork : public kpr::RefCount
+{
+public:
+    virtual ~ThreadPoolWork() {}
+    virtual void Do() = 0;
+};
+typedef kpr::RefHandleT<ThreadPoolWork> ThreadPoolWorkPtr;
+
+}
+
+#endif

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/6a45c767/rocketmq-client4cpp/src/kpr/TimerTaskManager.cpp
----------------------------------------------------------------------
diff --git a/rocketmq-client4cpp/src/kpr/TimerTaskManager.cpp 
b/rocketmq-client4cpp/src/kpr/TimerTaskManager.cpp
new file mode 100755
index 0000000..42ef672
--- /dev/null
+++ b/rocketmq-client4cpp/src/kpr/TimerTaskManager.cpp
@@ -0,0 +1,91 @@
+/**
+* Copyright (C) 2013 kangliqiang ,[email protected]
+*
+* Licensed under the Apache License, Version 2.0 (the "License");
+* you may not use this file except in compliance with the License.
+* You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+#include "TimerTaskManager.h"
+#include "ThreadPool.h"
+#include "ScopedLock.h"
+
+namespace kpr
+{
+TimerTaskManager::TimerTaskManager()
+{
+}
+
+TimerTaskManager::~TimerTaskManager()
+{
+}
+
+int TimerTaskManager::Init(int maxThreadCount, int checklnteval)
+{
+    try
+    {
+        m_pThreadPool = new ThreadPool("TimerThreadPool", 5, 5, 
maxThreadCount);
+        m_timerThread = new TimerThread("TimerThread", checklnteval);
+        m_timerThread->Start();
+    }
+    catch (...)
+    {
+        return -1;
+    }
+
+    return 0;
+}
+
+unsigned int TimerTaskManager::RegisterTimer(unsigned int initialDelay, 
unsigned int elapse, TimerTaskPtr pTask)
+{
+    unsigned int id = m_timerThread->RegisterTimer(initialDelay, elapse, this, 
true);
+
+    kpr::ScopedLock<kpr::Mutex> lock(m_mutex);
+    m_timerTasks[id] = pTask;
+
+    return id;
+}
+
+bool TimerTaskManager::UnRegisterTimer(unsigned int timerId)
+{
+    bool ret = m_timerThread->UnRegisterTimer(timerId);
+
+    kpr::ScopedLock<kpr::Mutex> lock(m_mutex);
+    m_timerTasks.erase(timerId);
+
+    return ret;
+}
+
+bool TimerTaskManager::ResetTimer(unsigned int timerId)
+{
+    return m_timerThread->ResetTimer(timerId);
+}
+
+void TimerTaskManager::OnTimeOut(unsigned int timerId)
+{
+    kpr::ScopedLock<kpr::Mutex> lock(m_mutex);
+    std::map<unsigned int, TimerTaskPtr>::iterator it = 
m_timerTasks.find(timerId);
+    if (it != m_timerTasks.end())
+    {
+        if (!it->second->IsProcessing())
+        {
+            it->second->SetProcessing(true);
+            m_pThreadPool->AddWork((it->second).ptr());
+        }
+    }
+}
+
+void TimerTaskManager::Stop()
+{
+    m_timerThread->Stop();
+    m_timerThread->Join();
+    m_pThreadPool->Destroy();
+}
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/6a45c767/rocketmq-client4cpp/src/kpr/TimerTaskManager.h
----------------------------------------------------------------------
diff --git a/rocketmq-client4cpp/src/kpr/TimerTaskManager.h 
b/rocketmq-client4cpp/src/kpr/TimerTaskManager.h
new file mode 100755
index 0000000..b9cc2e0
--- /dev/null
+++ b/rocketmq-client4cpp/src/kpr/TimerTaskManager.h
@@ -0,0 +1,95 @@
+/**
+* Copyright (C) 2013 kangliqiang ,[email protected]
+*
+* Licensed under the Apache License, Version 2.0 (the "License");
+* you may not use this file except in compliance with the License.
+* You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+#ifndef __KPR_TIMERTASKMANAGER_H__
+#define __KPR_TIMERTASKMANAGER_H__
+
+#include <list>
+#include <map>
+
+#include "RocketMQClient.h"
+#include "TimerThread.h"
+#include "ThreadPool.h"
+#include "ThreadPoolWork.h"
+
+namespace kpr
+{
+
+class TimerTask : public kpr::ThreadPoolWork
+{
+public:
+    TimerTask()
+        : m_isProcessing(false)
+    {
+    }
+
+       virtual ~TimerTask()
+       {
+       }
+
+    virtual void Do()
+    {
+               try
+               {
+               DoTask();
+               }
+               catch(...)
+               {
+                       RMQ_ERROR("TimerTask exception");
+               }
+        m_isProcessing = false;
+    }
+
+    bool IsProcessing()
+    {
+        return m_isProcessing;
+    }
+
+    void SetProcessing(bool isProcessing)
+    {
+        m_isProcessing = isProcessing;
+    }
+
+    virtual void DoTask() = 0;
+
+private:
+    bool m_isProcessing;
+};
+typedef kpr::RefHandleT<TimerTask> TimerTaskPtr;
+
+
+class TimerTaskManager : public TimerHandler
+{
+public:
+    TimerTaskManager();
+    virtual ~TimerTaskManager();
+
+    int Init(int maxThreadCount, int checklnteval);
+    unsigned int RegisterTimer(unsigned int initialDelay, unsigned int elapse, 
TimerTaskPtr pTask);
+    bool UnRegisterTimer(unsigned int timerId);
+    bool ResetTimer(unsigned int timerId);
+    void Stop();
+
+    virtual void OnTimeOut(unsigned int timerId);
+
+private:
+    std::map<unsigned int, TimerTaskPtr> m_timerTasks;
+    kpr::Mutex m_mutex;
+    TimerThreadPtr m_timerThread;
+    kpr::ThreadPoolPtr m_pThreadPool;
+};
+
+}
+#endif

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/6a45c767/rocketmq-client4cpp/src/kpr/TimerThread.cpp
----------------------------------------------------------------------
diff --git a/rocketmq-client4cpp/src/kpr/TimerThread.cpp 
b/rocketmq-client4cpp/src/kpr/TimerThread.cpp
new file mode 100755
index 0000000..b127074
--- /dev/null
+++ b/rocketmq-client4cpp/src/kpr/TimerThread.cpp
@@ -0,0 +1,186 @@
+/**
+* Copyright (C) 2013 kangliqiang ,[email protected]
+*
+* Licensed under the Apache License, Version 2.0 (the "License");
+* you may not use this file except in compliance with the License.
+* You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+#include "TimerThread.h"
+#include "KPRUtil.h"
+#include "ScopedLock.h"
+
+namespace kpr
+{
+unsigned int TimerThread::s_nextTimerID = 0;
+
+TimerThread::TimerThread(const char* name, unsigned int checklnterval)
+    : kpr::Thread(name), m_closed(false), m_checkInterval(checklnterval)
+{
+}
+
+TimerThread::~TimerThread()
+{
+}
+
+void TimerThread::Run()
+{
+    unsigned long long lastCheckTime = KPRUtil::GetCurrentTimeMillis();
+    unsigned long long currentCheckTime = lastCheckTime;
+
+    while (!m_closed)
+    {
+        currentCheckTime = KPRUtil::GetCurrentTimeMillis();
+        unsigned int elapse = (unsigned int)(currentCheckTime - lastCheckTime);
+
+        std::list<TimerInfo> timeList;
+
+        CheckTimeOut(elapse, timeList);
+
+        if (!timeList.empty())
+        {
+            std::list<TimerInfo>::iterator it = timeList.begin();
+            for (; it != timeList.end(); it++)
+            {
+               try
+               {
+                       it->pTimerHandler->OnTimeOut(it->id);
+                }
+                catch(...)
+                {
+                       RMQ_ERROR("TimerThread[%s] OnTimeOut exception", 
GetName());
+                }
+            }
+        }
+
+        unsigned long long checkEndTime = KPRUtil::GetCurrentTimeMillis();
+        int sleepTime = m_checkInterval - (int)(checkEndTime - 
currentCheckTime);
+        if (sleepTime < 0)
+        {
+            sleepTime = 0;
+        }
+
+        lastCheckTime = currentCheckTime;
+
+        try
+        {
+            kpr::ScopedLock<kpr::Monitor> lock(*this);
+            Wait(sleepTime);
+        }
+        catch (...)
+        {
+        }
+    }
+}
+
+void TimerThread::Stop()
+{
+    m_closed = true;
+    kpr::ScopedLock<kpr::Monitor> lock(*this);
+    Notify();
+}
+
+unsigned int TimerThread::RegisterTimer(unsigned int initialDelay, unsigned 
int elapse, TimerHandler* pHandler, bool persistent)
+{
+    TimerInfo info;
+    info.elapse = elapse;
+    info.outTime = elapse - initialDelay;
+    info.pTimerHandler = pHandler;
+    info.persistent = persistent;
+
+    kpr::ScopedLock<kpr::Mutex> lock(m_mutex);
+    info.id = GetNextTimerID();
+    m_timers[info.id] = info;
+
+    return info.id;
+}
+
+bool TimerThread::UnRegisterTimer(unsigned int timerId)
+{
+    bool result = false;
+    kpr::ScopedLock<kpr::Mutex> lock(m_mutex);
+    std::map<unsigned int, TimerInfo>::iterator it = m_timers.find(timerId);
+    if (it != m_timers.end())
+    {
+        m_timers.erase(it);
+        result = true;
+    }
+
+    return result;
+}
+
+bool TimerThread::ResetTimer(unsigned int timerId)
+{
+    bool result = false;
+    kpr::ScopedLock<kpr::Mutex> lock(m_mutex);
+    std::map<unsigned int, TimerInfo>::iterator it = m_timers.find(timerId);
+    if (it != m_timers.end())
+    {
+        if (it->second.persistent)
+        {
+            it->second.outTime = it->second.elapse;
+        }
+        else
+        {
+            it->second.outTime = 0;
+        }
+
+        result = true;
+    }
+
+    return result;
+}
+
+bool TimerThread::CheckTimeOut(unsigned int elapse, std::list<TimerInfo>& 
timerList)
+{
+    bool result = false;
+    timerList.clear();
+
+    kpr::ScopedLock<kpr::Mutex> lock(m_mutex);
+    if (!m_timers.empty())
+    {
+        std::map<unsigned int, TimerInfo>::iterator it = m_timers.begin();
+        while (it != m_timers.end())
+        {
+            it->second.outTime += elapse;
+
+            if (it->second.outTime >= int(it->second.elapse))
+            {
+                timerList.push_back(it->second);
+
+                if (it->second.persistent)
+                {
+                    it->second.outTime = 0;
+                    ++it;
+                }
+                else
+                {
+                    std::map<unsigned int, TimerInfo>::iterator it1 = it;
+                    ++it;
+                    m_timers.erase(it1);
+                }
+            }
+            else
+            {
+                ++it;
+            }
+        }
+
+        result = true;
+    }
+
+    return result;
+}
+
+unsigned int TimerThread::GetNextTimerID()
+{
+    return ++s_nextTimerID;
+}
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/6a45c767/rocketmq-client4cpp/src/kpr/TimerThread.h
----------------------------------------------------------------------
diff --git a/rocketmq-client4cpp/src/kpr/TimerThread.h 
b/rocketmq-client4cpp/src/kpr/TimerThread.h
new file mode 100755
index 0000000..7e02a79
--- /dev/null
+++ b/rocketmq-client4cpp/src/kpr/TimerThread.h
@@ -0,0 +1,79 @@
+/**
+ * Copyright (C) 2013 kangliqiang ,[email protected]
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#ifndef __KPR_TIMERTHREAD_H__
+#define __KPR_TIMERTHREAD_H__
+
+#include <list>
+#include <map>
+
+#include "RocketMQClient.h"
+#include "Thread.h"
+#include "Mutex.h"
+#include "Monitor.h"
+
+namespace kpr
+{
+class TimerHandler
+{
+public:
+    TimerHandler()
+    {
+    }
+
+    virtual ~TimerHandler()
+    {
+    }
+
+    virtual void OnTimeOut(unsigned int timerID) = 0;
+};
+
+typedef struct tagTimerlnfo
+{
+    unsigned int id;
+    unsigned int elapse;
+    int outTime;
+    bool persistent;
+    TimerHandler* pTimerHandler;
+} TimerInfo;
+
+
+class TimerThread : public kpr::Thread, public kpr::Monitor
+{
+public:
+    TimerThread(const char* name, unsigned int checklnterval);
+    virtual ~TimerThread();
+    virtual void Run();
+    virtual void Stop();
+
+    virtual unsigned int RegisterTimer(unsigned int initialDelay, unsigned int 
elapse, TimerHandler* pHandler, bool persistent = true);
+    virtual bool UnRegisterTimer(unsigned int timerId);
+    virtual bool ResetTimer(unsigned int timerId);
+
+private:
+    bool CheckTimeOut(unsigned int elapse, std::list<TimerInfo>& timerList);
+    static unsigned int GetNextTimerID();
+
+private:
+    static unsigned int s_nextTimerID;
+    std::map<unsigned int, TimerInfo> m_timers;
+    kpr::Mutex m_mutex;
+    bool m_closed;
+    unsigned int m_checkInterval;
+};
+typedef kpr::RefHandleT<TimerThread> TimerThreadPtr;
+}
+
+#endif

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/6a45c767/rocketmq-client4cpp/src/message/Message.cpp
----------------------------------------------------------------------
diff --git a/rocketmq-client4cpp/src/message/Message.cpp 
b/rocketmq-client4cpp/src/message/Message.cpp
new file mode 100755
index 0000000..db88c3e
--- /dev/null
+++ b/rocketmq-client4cpp/src/message/Message.cpp
@@ -0,0 +1,379 @@
+/**
+ * Copyright (C) 2013 kangliqiang ,[email protected]
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#include "Message.h"
+
+#include <string.h>
+#include <stdlib.h>
+#include <stdio.h>
+#include "UtilAll.h"
+
+
+namespace rmq
+{
+
+const std::string Message::PROPERTY_KEYS = "KEYS";
+const std::string Message::PROPERTY_TAGS = "TAGS";
+const std::string Message::PROPERTY_WAIT_STORE_MSG_OK = "WAIT";
+const std::string Message::PROPERTY_DELAY_TIME_LEVEL = "DELAY";
+const std::string Message::PROPERTY_RETRY_TOPIC = "RETRY_TOPIC";
+const std::string Message::PROPERTY_REAL_TOPIC = "REAL_TOPIC";
+const std::string Message::PROPERTY_REAL_QUEUE_ID = "REAL_QID";
+const std::string Message::PROPERTY_TRANSACTION_PREPARED = "TRAN_MSG";
+const std::string Message::PROPERTY_PRODUCER_GROUP = "PGROUP";
+const std::string Message::PROPERTY_MIN_OFFSET = "MIN_OFFSET";
+const std::string Message::PROPERTY_MAX_OFFSET = "MAX_OFFSET";
+const std::string Message::PROPERTY_BUYER_ID = "BUYER_ID";
+const std::string Message::PROPERTY_ORIGIN_MESSAGE_ID = "ORIGIN_MESSAGE_ID";
+const std::string Message::PROPERTY_TRANSFER_FLAG = "TRANSFER_FLAG";
+const std::string Message::PROPERTY_CORRECTION_FLAG = "CORRECTION_FLAG";
+const std::string Message::PROPERTY_MQ2_FLAG = "MQ2_FLAG";
+const std::string Message::PROPERTY_RECONSUME_TIME = "RECONSUME_TIME";
+const std::string Message::PROPERTY_MSG_REGION = "MSG_REGION";
+const std::string Message::PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX = "UNIQ_KEY";
+const std::string Message::PROPERTY_MAX_RECONSUME_TIMES = 
"MAX_RECONSUME_TIMES";
+const std::string Message::PROPERTY_CONSUME_START_TIMESTAMP = 
"CONSUME_START_TIME";
+const std::string Message::KEY_SEPARATOR = " ";
+
+Message::Message()
+{
+    Init("", "", "", 0, NULL, 0, true);
+}
+
+Message::Message(const std::string& topic, const char* body, int len)
+{
+    Init(topic, "", "", 0, body, len, true);
+}
+
+Message::Message(const std::string& topic, const std::string& tags, const 
char* body, int len)
+{
+    Init(topic, tags, "", 0, body, len, true);
+}
+
+Message::Message(const std::string& topic, const std::string& tags, const 
std::string& keys, const char* body, int len)
+{
+    Init(topic, tags, keys, 0, body, len, true);
+}
+
+Message::Message(const std::string& topic,
+                 const std::string& tags,
+                 const std::string& keys,
+                 const int  flag,
+                 const char* body,
+                 int len,
+                 bool waitStoreMsgOK)
+{
+    Init(topic, tags, keys, flag, body, len, waitStoreMsgOK);
+}
+
+Message::~Message()
+{
+       if (m_body)
+       {
+       free(m_body);
+       m_body = NULL;
+       m_bodyLen = 0;
+    }
+
+    if (m_compressBody)
+       {
+       free(m_compressBody);
+       m_compressBody = NULL;
+       m_compressBodyLen = 0;
+    }
+}
+
+Message::Message(const Message& other)
+{
+    m_body = (char*)malloc(other.m_bodyLen);
+    m_bodyLen = other.m_bodyLen;
+       memcpy(m_body, other.m_body, other.m_bodyLen);
+
+    m_compressBody = NULL;
+    m_compressBodyLen = 0;
+
+    m_topic = other.m_topic;
+    m_flag = other.m_flag;
+    m_properties = other.m_properties;
+}
+
+Message& Message::operator=(const Message& other)
+{
+    if (this != &other)
+    {
+       if (m_body)
+               {
+               free(m_body);
+               m_body = NULL;
+               m_bodyLen = 0;
+        }
+
+        if (m_compressBody)
+               {
+               free(m_compressBody);
+               m_compressBody = NULL;
+               m_compressBodyLen = 0;
+           }
+
+        m_body = (char*)malloc(other.m_bodyLen);;
+        m_bodyLen = other.m_bodyLen;
+        memcpy(m_body, other.m_body, other.m_bodyLen);
+
+        m_topic = other.m_topic;
+        m_flag = other.m_flag;
+        m_properties = other.m_properties;
+    }
+
+    return *this;
+}
+
+void Message::clearProperty(const std::string& name)
+{
+    m_properties.erase(name);
+}
+
+void Message::putProperty(const std::string& name, const std::string& value)
+{
+    m_properties[name] = value;
+}
+
+std::string Message::getProperty(const std::string& name)
+{
+    std::map<std::string, std::string>::const_iterator it = 
m_properties.find(name);
+    return (it == m_properties.end()) ? "" : it->second;
+}
+
+std::string Message::getTopic()const
+{
+    return m_topic;
+}
+
+void Message::setTopic(const std::string& topic)
+{
+    m_topic = topic;
+}
+
+std::string Message::getTags()
+{
+    return getProperty(PROPERTY_TAGS);
+}
+
+void Message::setTags(const std::string& tags)
+{
+    putProperty(PROPERTY_TAGS, tags);
+}
+
+std::string Message::getKeys()
+{
+    return getProperty(PROPERTY_KEYS);
+}
+
+void Message::setKeys(const std::string& keys)
+{
+    putProperty(PROPERTY_KEYS, keys);
+}
+
+void Message::setKeys(const std::list<std::string> keys)
+{
+    if (keys.empty())
+    {
+        return;
+    }
+
+    std::list<std::string>::const_iterator it = keys.begin();
+    std::string str;
+    str += *it;
+    it++;
+
+    for (; it != keys.end(); it++)
+    {
+        str += KEY_SEPARATOR;
+        str += *it;
+    }
+
+    setKeys(str);
+}
+
+int Message::getDelayTimeLevel()
+{
+    std::string tmp = getProperty(PROPERTY_DELAY_TIME_LEVEL);
+    if (!tmp.empty())
+    {
+        return atoi(tmp.c_str());
+    }
+
+    return 0;
+}
+
+void Message::setDelayTimeLevel(int level)
+{
+    char tmp[16];
+    snprintf(tmp, sizeof(tmp), "%d", level);
+
+    putProperty(PROPERTY_DELAY_TIME_LEVEL, tmp);
+}
+
+bool Message::isWaitStoreMsgOK()
+{
+    std::string tmp = getProperty(PROPERTY_WAIT_STORE_MSG_OK);
+    if (tmp.empty())
+    {
+        return true;
+    }
+    else
+    {
+        return (tmp == "true") ? true : false;
+    }
+}
+
+void Message::setWaitStoreMsgOK(bool waitStoreMsgOK)
+{
+    if (waitStoreMsgOK)
+    {
+        putProperty(PROPERTY_WAIT_STORE_MSG_OK, "true");
+    }
+    else
+    {
+        putProperty(PROPERTY_WAIT_STORE_MSG_OK, "false");
+    }
+}
+
+int Message::getFlag()
+{
+    return m_flag;
+}
+
+void Message::setFlag(int flag)
+{
+    m_flag = flag;
+}
+
+const char* Message::getBody()const
+{
+    return m_body;
+}
+
+int Message::getBodyLen()const
+{
+    return m_bodyLen;
+}
+
+void Message::setBody(const char* body, int len)
+{
+    if (len > 0)
+    {
+       if (m_body)
+       {
+               free(m_body);
+               m_body = NULL;
+               m_bodyLen = 0;
+       }
+
+        m_body = (char*)malloc(len);
+        m_bodyLen = len;
+        memcpy(m_body, body, len);
+    }
+}
+
+bool Message::tryToCompress(int compressLevel)
+{
+    if (m_body != NULL)
+    {
+       if (m_compressBody)
+               {
+               free(m_compressBody);
+               m_compressBody = NULL;
+               m_compressBodyLen = 0;
+           }
+
+        unsigned char* pOut;
+        int outLen = 0;
+        if (UtilAll::compress(m_body, m_bodyLen, &pOut, &outLen, 
compressLevel))
+        {
+            m_compressBody = (char*)pOut;
+            m_compressBodyLen = outLen;
+            return true;
+        }
+    }
+
+    return false;
+}
+
+
+const char* Message::getCompressBody() const
+{
+    return m_compressBody;
+}
+
+int Message::getCompressBodyLen() const
+{
+    return m_compressBodyLen;
+}
+
+
+
+std::map<std::string, std::string>& Message::getProperties()
+{
+    return m_properties;
+}
+
+void Message::setProperties(const std::map<std::string, std::string>& 
properties)
+{
+    m_properties = properties;
+}
+
+void Message::Init(const std::string& topic, const std::string& tags, const 
std::string& keys, const int flag, const char* body, int len, bool 
waitStoreMsgOK)
+{
+    m_topic = topic;
+    m_flag = flag;
+
+    m_body = NULL;
+    m_bodyLen = len;
+
+    m_compressBody = NULL;
+    m_compressBodyLen = 0;
+
+    if (len > 0)
+    {
+        m_body = (char*)malloc(len);
+        memcpy(m_body, body, len);
+    }
+
+    if (tags.length() > 0)
+    {
+        setTags(tags);
+    }
+
+    if (keys.length() > 0)
+    {
+        setKeys(keys);
+    }
+
+    setWaitStoreMsgOK(waitStoreMsgOK);
+}
+
+std::string Message::toString() const
+{
+       std::stringstream ss;
+    ss << "{m_topic=" << m_topic
+       << ",m_flag=" << m_flag
+       << ",properties=" << UtilAll::toString(m_properties)
+       << ",m_bodyLen=" << m_bodyLen
+       << "}";
+    return ss.str();
+}
+
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/6a45c767/rocketmq-client4cpp/src/message/MessageDecoder.cpp
----------------------------------------------------------------------
diff --git a/rocketmq-client4cpp/src/message/MessageDecoder.cpp 
b/rocketmq-client4cpp/src/message/MessageDecoder.cpp
new file mode 100755
index 0000000..338121e
--- /dev/null
+++ b/rocketmq-client4cpp/src/message/MessageDecoder.cpp
@@ -0,0 +1,366 @@
+/**
+* Copyright (C) 2013 kangliqiang ,[email protected]
+*
+* Licensed under the Apache License, Version 2.0 (the "License");
+* you may not use this file except in compliance with the License.
+* You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+#include "MessageDecoder.h"
+
+#include <string.h>
+#include <stdio.h>
+#include <stdlib.h>
+#include <sstream>
+#include "MessageExt.h"
+#include "MessageSysFlag.h"
+#include "UtilAll.h"
+
+namespace rmq
+{
+
+const char MessageDecoder::NAME_VALUE_SEPARATOR = 1;
+const char MessageDecoder::PROPERTY_SEPARATOR = 2;
+const int MessageDecoder::MSG_ID_LENGTH = 8 + 8;
+
+int MessageDecoder::MessageMagicCodePostion = 4;
+int MessageDecoder::MessageFlagPostion = 16;
+int MessageDecoder::MessagePhysicOffsetPostion = 28;
+int MessageDecoder::MessageStoreTimestampPostion = 56;
+
+std::string MessageDecoder::createMessageId(sockaddr& addr, long long offset)
+{
+    struct sockaddr_in sa;
+    memcpy(&sa, &addr, sizeof(sockaddr));
+    sa.sin_family = AF_INET;
+
+    int port = ntohs(sa.sin_port);
+    port = htonl(port);
+    int ip = sa.sin_addr.s_addr;
+
+    unsigned char* buf = new unsigned char[MSG_ID_LENGTH];
+    offset = h2nll(offset);
+    memcpy(buf, &ip, 4);
+    memcpy(buf + 4, &port, 4);
+    memcpy(buf + 8, &offset, 8);
+
+    char* str = new char[2 * MSG_ID_LENGTH + 1];
+    memset(str, 0, 2 * MSG_ID_LENGTH + 1);
+
+    for (int i = 0; i < MSG_ID_LENGTH; i++)
+    {
+        char tmp[3];
+        tmp[2] = 0;
+
+        snprintf(tmp, sizeof(tmp), "%02X", buf[i]);
+        strncat(str, tmp, sizeof(tmp));
+    }
+
+    std::string ret = str;
+
+    delete[] buf;
+    delete[] str;
+
+    return ret;
+}
+
+MessageId MessageDecoder::decodeMessageId(const std::string& msgId)
+{
+    std::string ipstr = msgId.substr(0, 8);
+    std::string portstr = msgId.substr(8, 8);
+    std::string offsetstr = msgId.substr(16);
+
+    char* end;
+    int ipint = strtoul(ipstr.c_str(), &end, 16);
+    int portint = strtoul(portstr.c_str(), &end, 16);
+
+    long long offset = UtilAll::hexstr2ull(offsetstr.c_str());
+
+    offset = n2hll(offset);
+
+    portint = ntohl(portint);
+    short port = portint;
+
+    struct sockaddr_in sa;
+    sa.sin_family = AF_INET;
+    sa.sin_port = htons(port);
+    sa.sin_addr.s_addr = ipint;
+
+    sockaddr addr;
+    memcpy(&addr, &sa, sizeof(sockaddr));
+
+    MessageId id(addr, offset);
+
+    return id;
+}
+
+MessageExt* MessageDecoder::decode(const char* pData, int len, int& offset)
+{
+    return decode(pData, len, offset, true);
+}
+
+MessageExt* MessageDecoder::decode(const char* pData, int len, int& offset, 
bool readBody)
+{
+       MessageExt* msgExt = NULL;
+
+    try
+    {
+        msgExt = new MessageExt();
+
+        // 1 TOTALSIZE
+        int storeSize;
+        memcpy(&storeSize, pData, 4);
+        storeSize = ntohl(storeSize);
+
+        msgExt->setStoreSize(storeSize);
+
+        // 2 MAGICCODE sizeof(int)
+
+        // 3 BODYCRC
+        int bodyCRC;
+        memcpy(&bodyCRC, pData + 2 * sizeof(int), 4);
+        bodyCRC = ntohl(bodyCRC);
+        msgExt->setBodyCRC(bodyCRC);
+
+        // 4 QUEUEID
+        int queueId;
+        memcpy(&queueId, pData + 3 * sizeof(int), 4);
+        queueId = ntohl(queueId);
+        msgExt->setQueueId(queueId);
+
+        // 5 FLAG
+        int flag ;
+
+        memcpy(&flag, pData + 4 * sizeof(int), 4);
+        flag = ntohl(flag);
+
+        msgExt->setFlag(flag);
+
+        // 6 QUEUEOFFSET
+        long long queueOffset;
+        memcpy(&queueOffset, pData + 5 * sizeof(int), 8);
+        queueOffset = n2hll(queueOffset);
+        msgExt->setQueueOffset(queueOffset);
+
+        // 7 PHYSICALOFFSET
+        long long physicOffset;
+
+        memcpy(&physicOffset, pData + 7 * sizeof(int), 8);
+        physicOffset = n2hll(physicOffset);
+        msgExt->setCommitLogOffset(physicOffset);
+
+        // 8 SYSFLAG
+        int sysFlag;
+
+        memcpy(&sysFlag, pData + 9 * sizeof(int), 4);
+        sysFlag = ntohl(sysFlag);
+        msgExt->setSysFlag(sysFlag);
+
+        // 9 BORNTIMESTAMP
+        long long bornTimeStamp;
+        memcpy(&bornTimeStamp, pData + 10 * sizeof(int), 8);
+        bornTimeStamp = n2hll(bornTimeStamp);
+
+        msgExt->setBornTimestamp(bornTimeStamp);
+
+        // 10 BORNHOST
+        int bornHost;//c0 a8 00 68  192.168.0.104 c0 a8 00 68 00 00 c4 04
+        memcpy(&bornHost, pData + 12 * sizeof(int), 4);
+
+        int port;
+        memcpy(&port, pData + 13 * sizeof(int), 4);
+        port = ntohl(port);
+
+        struct sockaddr_in sa;
+        sa.sin_family = AF_INET;
+        sa.sin_port = htons(port);
+        sa.sin_addr.s_addr = bornHost;
+
+        sockaddr bornAddr;
+        memcpy(&bornAddr, &sa, sizeof(sockaddr));
+        msgExt->setBornHost(bornAddr);
+
+        // 11 STORETIMESTAMP
+        long long storeTimestamp;
+        memcpy(&storeTimestamp, pData + 14 * sizeof(int), 8);
+        storeTimestamp = n2hll(storeTimestamp);
+        msgExt->setStoreTimestamp(storeTimestamp);
+
+        // 12 STOREHOST
+        int storeHost;
+        memcpy(&storeHost, pData + 16 * sizeof(int), 4);
+        memcpy(&port, pData + 17 * sizeof(int), 4);
+        port = ntohl(port);
+
+        sa.sin_family = AF_INET;
+        sa.sin_port = htons(port);
+        sa.sin_addr.s_addr = storeHost;
+
+        sockaddr storeAddr;
+        memcpy(&storeAddr, &sa, sizeof(sockaddr));
+
+        msgExt->setStoreHost(storeAddr);
+
+        // 13 RECONSUMETIMES
+        int reconsumeTimes;
+        memcpy(&reconsumeTimes, pData + 18 * sizeof(int), 4);
+        reconsumeTimes = ntohl(reconsumeTimes);
+        msgExt->setReconsumeTimes(reconsumeTimes);
+
+        // 14 Prepared Transaction Offset
+        long long preparedTransactionOffset;
+        memcpy(&preparedTransactionOffset, pData + 19 * sizeof(int), 8);
+        preparedTransactionOffset = n2hll(preparedTransactionOffset);
+        msgExt->setPreparedTransactionOffset(preparedTransactionOffset);
+
+        // 15 BODY
+        int bodyLen = 0;
+        memcpy(&bodyLen, pData + 21 * sizeof(int), 4);
+        bodyLen = ntohl(bodyLen);
+
+        if (bodyLen > 0)
+        {
+            if (readBody)
+            {
+                const char* body = pData + 22 * sizeof(int);
+                int newBodyLen = bodyLen;
+
+                // uncompress body
+                if ((sysFlag & MessageSysFlag::CompressedFlag) == 
MessageSysFlag::CompressedFlag)
+                {
+                    unsigned char* pOut;
+                    int outLen;
+
+                    if (UtilAll::decompress(body, bodyLen, &pOut, &outLen))
+                    {
+                        msgExt->setBody((char*)pOut, outLen);
+                        free(pOut);
+                    }
+                    else
+                    {
+                        msgExt->setBody(body, newBodyLen);
+                    }
+                }
+                else
+                {
+                    msgExt->setBody(body, newBodyLen);
+                }
+            }
+            else
+            {
+
+            }
+        }
+
+        // 16 TOPIC
+        int topicLen = *(pData + 22 * sizeof(int) + bodyLen);
+
+        char* tmp = new char[topicLen + 1];
+
+        memcpy(tmp, pData + 22 * sizeof(int) + bodyLen + 1, topicLen);
+        tmp[topicLen] = 0;
+        std::string topic = tmp;
+
+        delete[] tmp;
+
+        msgExt->setTopic(topic);
+
+        // 17 properties
+        short propertiesLength;
+        memcpy(&propertiesLength, pData + 22 * sizeof(int) + bodyLen + 1 + 
topicLen, 2);
+        propertiesLength = ntohs(propertiesLength);
+
+        if (propertiesLength > 0)
+        {
+            char* properties = new char[propertiesLength + 1];
+            memcpy(properties, pData + 22 * sizeof(int) + bodyLen + 1 + 
topicLen + 2, propertiesLength);
+            properties[propertiesLength] = 0;
+            std::string propertiesString = properties;
+            std::map<std::string, std::string> map;
+            string2messageProperties(map, propertiesString);
+            msgExt->setProperties(map);
+            delete[] properties;
+        }
+
+        offset = 22 * sizeof(int) + bodyLen + 1 + topicLen + 2 + 
propertiesLength;
+
+        // ��ϢID
+        std::string msgId =  createMessageId(storeAddr, physicOffset);
+        msgExt->setMsgId(msgId);
+
+        return msgExt;
+    }
+    catch (...)
+    {
+       RMQ_ERROR("decode exception");
+               if (msgExt)
+               {
+                       delete msgExt;
+                       msgExt = NULL;
+               }
+    }
+
+    return NULL;
+}
+
+std::list<MessageExt*> MessageDecoder::decodes(const char* pData, int len)
+{
+    return decodes(pData, len, true);
+}
+
+std::list<MessageExt*> MessageDecoder::decodes(const char* pData, int len, 
bool readBody)
+{
+    std::list<MessageExt*> list;
+
+    int offset = 0;
+    while (offset < len)
+    {
+        int tmp;
+        MessageExt* msg = decode(pData + offset, len, tmp);
+        list.push_back(msg);
+        offset += tmp;
+    }
+
+    return list;
+}
+
+std::string MessageDecoder::messageProperties2String(const 
std::map<std::string, std::string>& properties)
+{
+    std::stringstream ss;
+
+    std::map<std::string, std::string>::const_iterator it = properties.begin();
+
+    for (; it != properties.end(); it++)
+    {
+        ss << it->first << NAME_VALUE_SEPARATOR << it->second << 
PROPERTY_SEPARATOR;
+    }
+
+    return ss.str();
+}
+
+void MessageDecoder::string2messageProperties(std::map<std::string, 
std::string>& properties,
+        std::string& propertiesString)
+{
+    std::vector<std::string> out;
+    UtilAll::Split(out, propertiesString, PROPERTY_SEPARATOR);
+
+    for (size_t i = 0; i < out.size(); i++)
+    {
+        std::vector<std::string> outValue;
+        UtilAll::Split(outValue, out[i], NAME_VALUE_SEPARATOR);
+
+        if (outValue.size() == 2)
+        {
+            properties[outValue[0]] = outValue[1];
+        }
+    }
+}
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/6a45c767/rocketmq-client4cpp/src/message/MessageDecoder.h
----------------------------------------------------------------------
diff --git a/rocketmq-client4cpp/src/message/MessageDecoder.h 
b/rocketmq-client4cpp/src/message/MessageDecoder.h
new file mode 100755
index 0000000..a5f24ed
--- /dev/null
+++ b/rocketmq-client4cpp/src/message/MessageDecoder.h
@@ -0,0 +1,64 @@
+/**
+* Copyright (C) 2013 kangliqiang ,[email protected]
+*
+* Licensed under the Apache License, Version 2.0 (the "License");
+* you may not use this file except in compliance with the License.
+* You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+#ifndef __MESSAGEDECODER_H__
+#define  __MESSAGEDECODER_H__
+
+#include <string>
+#include <list>
+#include <map>
+
+#include "SocketUtil.h"
+#include "MessageId.h"
+
+namespace rmq
+{
+    class MessageExt;
+    class UnknownHostException;
+
+    /**
+    * Message decoder
+    *
+    */
+    class MessageDecoder
+    {
+    public:
+        static std::string createMessageId(sockaddr& addr, long long offset);
+        static MessageId decodeMessageId(const std::string& msgId);
+
+        static MessageExt* decode(const char* pData, int len, int& offset);
+        static MessageExt* decode(const char* pData, int len, int& offset, 
bool readBody);
+
+        static std::list<MessageExt*> decodes(const char* pData, int len);
+        static std::list<MessageExt*> decodes(const char* pData, int len, bool 
readBody);
+
+        static std::string messageProperties2String(const 
std::map<std::string, std::string>& properties);
+        static void string2messageProperties(std::map<std::string, 
std::string>& properties,
+                                             std::string& propertiesString);
+
+    public:
+        static const char NAME_VALUE_SEPARATOR;
+        static const char PROPERTY_SEPARATOR;
+
+        static const int MSG_ID_LENGTH;
+
+        static int MessageMagicCodePostion;
+        static int MessageFlagPostion;
+        static int MessagePhysicOffsetPostion;
+        static int MessageStoreTimestampPostion;
+    };
+}
+
+#endif

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/6a45c767/rocketmq-client4cpp/src/message/MessageExt.cpp
----------------------------------------------------------------------
diff --git a/rocketmq-client4cpp/src/message/MessageExt.cpp 
b/rocketmq-client4cpp/src/message/MessageExt.cpp
new file mode 100755
index 0000000..35479ce
--- /dev/null
+++ b/rocketmq-client4cpp/src/message/MessageExt.cpp
@@ -0,0 +1,244 @@
+/**
+* Copyright (C) 2013 kangliqiang ,[email protected]
+*
+* Licensed under the Apache License, Version 2.0 (the "License");
+* you may not use this file except in compliance with the License.
+* You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+#include "MessageExt.h"
+
+#include <sstream>
+#include "MessageSysFlag.h"
+#include "SocketUtil.h"
+
+namespace rmq
+{
+
+MessageExt::MessageExt()
+    : m_queueOffset(0),
+      m_commitLogOffset(0),
+      m_bornTimestamp(0),
+      m_storeTimestamp(0),
+      m_preparedTransactionOffset(0),
+      m_queueId(0),
+      m_storeSize(0),
+      m_sysFlag(0),
+      m_bodyCRC(0),
+      m_reconsumeTimes(3),
+      m_msgId("")
+{
+}
+
+MessageExt::MessageExt(int queueId,
+                       long long bornTimestamp,
+                       sockaddr bornHost,
+                       long long storeTimestamp,
+                       sockaddr storeHost,
+                       std::string msgId)
+    : m_queueOffset(0),
+      m_commitLogOffset(0),
+      m_bornTimestamp(bornTimestamp),
+      m_storeTimestamp(storeTimestamp),
+      m_preparedTransactionOffset(0),
+      m_queueId(queueId),
+      m_storeSize(0),
+      m_sysFlag(0),
+      m_bodyCRC(0),
+      m_reconsumeTimes(3),
+      m_bornHost(bornHost),
+      m_storeHost(storeHost),
+      m_msgId(msgId)
+{
+
+}
+
+MessageExt::~MessageExt()
+{
+
+}
+
+int MessageExt::getQueueId()
+{
+    return m_queueId;
+}
+
+void MessageExt::setQueueId(int queueId)
+{
+    m_queueId = queueId;
+}
+
+long long MessageExt::getBornTimestamp()
+{
+    return m_bornTimestamp;
+}
+
+void MessageExt::setBornTimestamp(long long bornTimestamp)
+{
+    m_bornTimestamp = bornTimestamp;
+}
+
+sockaddr MessageExt::getBornHost()
+{
+    return m_bornHost;
+}
+
+std::string MessageExt::getBornHostString()
+{
+    return socketAddress2String(m_bornHost);
+}
+
+std::string MessageExt::getBornHostNameString()
+{
+    return getHostName(m_bornHost);
+}
+
+void MessageExt::setBornHost(const sockaddr& bornHost)
+{
+    m_bornHost = bornHost;
+}
+
+long long MessageExt::getStoreTimestamp()
+{
+    return m_storeTimestamp;
+}
+
+void MessageExt::setStoreTimestamp(long long storeTimestamp)
+{
+    m_storeTimestamp = storeTimestamp;
+}
+
+sockaddr MessageExt::getStoreHost()
+{
+    return m_storeHost;
+}
+
+std::string MessageExt::getStoreHostString()
+{
+    return socketAddress2String(m_storeHost);
+}
+
+void MessageExt::setStoreHost(const sockaddr& storeHost)
+{
+    m_storeHost = storeHost;
+}
+
+std::string MessageExt::getMsgId()
+{
+    return m_msgId;
+}
+
+void MessageExt::setMsgId(const std::string& msgId)
+{
+    m_msgId = msgId;
+}
+
+int MessageExt::getSysFlag()
+{
+    return m_sysFlag;
+}
+
+void MessageExt::setSysFlag(int sysFlag)
+{
+    m_sysFlag = sysFlag;
+}
+
+int MessageExt::getBodyCRC()
+{
+    return m_bodyCRC;
+}
+
+void MessageExt::setBodyCRC(int bodyCRC)
+{
+    m_bodyCRC = bodyCRC;
+}
+
+long long MessageExt::getQueueOffset()
+{
+    return m_queueOffset;
+}
+
+void MessageExt::setQueueOffset(long long queueOffset)
+{
+    m_queueOffset = queueOffset;
+}
+
+long long MessageExt::getCommitLogOffset()
+{
+    return m_commitLogOffset;
+}
+
+void MessageExt::setCommitLogOffset(long long physicOffset)
+{
+    m_commitLogOffset = physicOffset;
+}
+
+int MessageExt::getStoreSize()
+{
+    return m_storeSize;
+}
+
+void MessageExt::setStoreSize(int storeSize)
+{
+    m_storeSize = storeSize;
+}
+
+TopicFilterType MessageExt::parseTopicFilterType(int sysFlag)
+{
+    if ((sysFlag & MessageSysFlag::MultiTagsFlag) == 
MessageSysFlag::MultiTagsFlag)
+    {
+        return MULTI_TAG;
+    }
+
+    return SINGLE_TAG;
+}
+
+int MessageExt::getReconsumeTimes()
+{
+    return m_reconsumeTimes;
+}
+
+void MessageExt::setReconsumeTimes(int reconsumeTimes)
+{
+    m_reconsumeTimes = reconsumeTimes;
+}
+
+long long MessageExt::getPreparedTransactionOffset()
+{
+    return  m_preparedTransactionOffset;
+}
+
+void MessageExt::setPreparedTransactionOffset(long long 
preparedTransactionOffset)
+{
+    m_preparedTransactionOffset = preparedTransactionOffset;
+}
+
+std::string MessageExt::toString() const
+{
+    std::stringstream ss;
+    ss << "{msgId=" << m_msgId
+       << ",queueId=" << m_queueId
+       << ",storeSize=" << m_storeSize
+       << ",sysFlag=" << m_sysFlag
+       << ",queueOffset=" << m_queueOffset
+       << ",commitLogOffset=" << m_commitLogOffset
+       << ",preparedTransactionOffset=" << m_preparedTransactionOffset
+       << ",bornTimestamp=" << m_bornTimestamp
+       << ",bornHost=" << socketAddress2String(m_bornHost)
+       << ",storeHost=" << socketAddress2String(m_storeHost)
+       << ",storeTimestamp=" << m_storeTimestamp
+       << ",reconsumeTimes=" << m_reconsumeTimes
+       << ",bodyCRC=" << m_bodyCRC
+       << ",Message=" << Message::toString()
+       << "}";
+    return ss.str();
+}
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/6a45c767/rocketmq-client4cpp/src/message/MessageId.h
----------------------------------------------------------------------
diff --git a/rocketmq-client4cpp/src/message/MessageId.h 
b/rocketmq-client4cpp/src/message/MessageId.h
new file mode 100644
index 0000000..5237f8d
--- /dev/null
+++ b/rocketmq-client4cpp/src/message/MessageId.h
@@ -0,0 +1,59 @@
+/**
+* Copyright (C) 2013 kangliqiang ,[email protected]
+*
+* Licensed under the Apache License, Version 2.0 (the "License");
+* you may not use this file except in compliance with the License.
+* You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+#ifndef __MESSAGEID_H__
+#define __MESSAGEID_H__
+
+#include "SocketUtil.h"
+
+namespace rmq
+{
+    class MessageId
+    {
+    public:
+        MessageId(sockaddr address, long long offset)
+            : m_address(address), m_offset(offset)
+        {
+
+        }
+
+        sockaddr getAddress()
+        {
+            return m_address;
+        }
+
+        void setAddress(sockaddr address)
+        {
+            m_address = address;
+        }
+
+        long long getOffset()
+        {
+            return m_offset;
+        }
+
+        void setOffset(long long offset)
+        {
+            m_offset = offset;
+        }
+
+    private:
+        sockaddr m_address;
+        long long m_offset;
+    };
+}
+
+#endif

Reply via email to