http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/6a45c767/rocketmq-client4cpp/src/kpr/Mutex.cpp ---------------------------------------------------------------------- diff --git a/rocketmq-client4cpp/src/kpr/Mutex.cpp b/rocketmq-client4cpp/src/kpr/Mutex.cpp new file mode 100755 index 0000000..f98282c --- /dev/null +++ b/rocketmq-client4cpp/src/kpr/Mutex.cpp @@ -0,0 +1,296 @@ +/** + * Copyright (C) 2013 kangliqiang ,[email protected] + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#include "Mutex.h" + +#include <pthread.h> +#include <unistd.h> +#include <stdio.h> +#include <errno.h> +#include <time.h> + + +namespace kpr +{ +Mutex::Mutex() +{ + ::pthread_mutex_init(&m_mutex, NULL); +} + +Mutex::~Mutex() +{ + ::pthread_mutex_destroy(&m_mutex); +} + +void Mutex::Lock() const +{ + ::pthread_mutex_lock(&m_mutex); +} + +bool Mutex::TryLock() const +{ + int ret = ::pthread_mutex_trylock(&m_mutex); + return (ret == 0); +} + +bool Mutex::TryLock(int timeout) const +{ + struct timespec ts; + clock_gettime(CLOCK_REALTIME, &ts); + ts.tv_sec += (timeout/1000); + ts.tv_nsec += (timeout%1000) * 1000 * 1000; + + int ret = ::pthread_mutex_timedlock(&m_mutex, &ts); + return (ret == 0); +} + + +void Mutex::Unlock() const +{ + ::pthread_mutex_unlock(&m_mutex); +} + +//*********** +//RWMutex +//*************** +RWMutex::RWMutex() +{ + ::pthread_rwlock_init(&m_mutex, NULL); +} + +RWMutex::~RWMutex() +{ + ::pthread_rwlock_destroy(&m_mutex); +} + +void RWMutex::ReadLock() const +{ + ::pthread_rwlock_rdlock(&m_mutex); +} + +void RWMutex::WriteLock() const +{ + ::pthread_rwlock_wrlock(&m_mutex); +} + +bool RWMutex::TryReadLock() const +{ + int ret = ::pthread_rwlock_tryrdlock(&m_mutex); + return (ret == 0); +} + +bool RWMutex::TryReadLock(int timeout) const +{ + struct timespec ts; + clock_gettime(CLOCK_REALTIME, &ts); + ts.tv_sec += (timeout/1000); + ts.tv_nsec += (timeout%1000) * 1000 * 1000; + + int ret = ::pthread_rwlock_timedrdlock(&m_mutex, &ts); + return (ret == 0); +} + +bool RWMutex::TryWriteLock() const +{ + int ret = ::pthread_rwlock_trywrlock(&m_mutex); + return (ret == 0); +} + +bool RWMutex::TryWriteLock(int timeout) const +{ + struct timespec ts; + clock_gettime(CLOCK_REALTIME, &ts); + ts.tv_sec += (timeout/1000); + ts.tv_nsec += (timeout%1000) * 1000 * 1000; + + int ret = ::pthread_rwlock_timedwrlock(&m_mutex, &ts); + return (ret == 0); +} + + +void RWMutex::Unlock() const +{ + ::pthread_rwlock_unlock(&m_mutex); +} + + +//*********** +//RecursiveMutex +//*************** +RecursiveMutex::RecursiveMutex() + : m_count(0), + m_owner(ThreadId()) +{ + ::pthread_mutex_init(&m_mutex, NULL); +} + +RecursiveMutex::~RecursiveMutex() +{ + ::pthread_mutex_destroy(&m_mutex); +} + +bool RecursiveMutex::Lock()const +{ + return ((RecursiveMutex*)this)->lock(1); +} + +bool RecursiveMutex::Unlock()const +{ + return ((RecursiveMutex*)this)->unlock(); +} + +bool RecursiveMutex::TryLock()const +{ + return ((RecursiveMutex*)this)->tryLock(); +} + +ThreadId RecursiveMutex::GetOwner()const +{ + m_internal.Lock(); + ThreadId id; + if (m_count > 0) + { + id = m_owner; + } + m_internal.Unlock(); + + return id; +} + +bool RecursiveMutex::lock(int count) +{ + bool rc = false; + bool obtained = false; + + while (!obtained) + { + m_internal.Lock(); + + if (m_count == 0) + { + m_count = count; + m_owner = ThreadId::GetCurrentThreadId(); + obtained = true; + rc = true; + + try + { + ::pthread_mutex_lock(&m_mutex); + } + catch (...) + { + try + { + m_internal.Unlock(); + } + catch (...) + { + } + throw; + } + } + else if (m_owner == ThreadId::GetCurrentThreadId()) + { + m_count += count; + obtained = true; + } + + m_internal.Unlock(); + + if (!obtained) + { + ::pthread_mutex_lock(&m_mutex); + ::pthread_mutex_unlock(&m_mutex); + } + } + + return rc; +} + +bool RecursiveMutex::tryLock() +{ + bool obtained = false; + + m_internal.Lock(); + + if (m_count == 0) + { + m_count = 1; + m_owner = ThreadId::GetCurrentThreadId(); + obtained = true; + + try + { + ::pthread_mutex_lock(&m_mutex); + } + catch (...) + { + try + { + m_internal.Unlock(); + } + catch (...) + { + } + throw; + } + } + else if (m_owner == ThreadId::GetCurrentThreadId()) + { + ++m_count; + obtained = true; + } + + m_internal.Unlock(); + + return obtained; +} + +bool RecursiveMutex::unlock() +{ + bool rc; + m_internal.Lock(); + + if (--m_count == 0) + { + m_owner = ThreadId(); + + ::pthread_mutex_unlock(&m_mutex); + + rc = true; + } + else + { + rc = false; + } + + m_internal.Unlock(); + + return rc; +} + +unsigned int RecursiveMutex::reset4Condvar() +{ + m_internal.Lock(); + + unsigned int count = m_count; + m_count = 0; + m_owner = ThreadId(); + + m_internal.Unlock(); + + return count; +} +}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/6a45c767/rocketmq-client4cpp/src/kpr/Mutex.h ---------------------------------------------------------------------- diff --git a/rocketmq-client4cpp/src/kpr/Mutex.h b/rocketmq-client4cpp/src/kpr/Mutex.h new file mode 100755 index 0000000..fc3498f --- /dev/null +++ b/rocketmq-client4cpp/src/kpr/Mutex.h @@ -0,0 +1,107 @@ +/** + * Copyright (C) 2013 kangliqiang ,[email protected] + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#ifndef __KPR_MUTEX_H__ +#define __KPR_MUTEX_H__ + +#include "KPRTypes.h" +#include <errno.h> + +namespace kpr +{ +class Mutex +{ +public: + Mutex(); + ~Mutex(); + + void Lock()const; + void Unlock()const; + bool TryLock()const; + bool TryLock(int timeout) const; + + ThreadId GetOwner()const; + +private: + Mutex(const Mutex&); + const Mutex& operator=(const Mutex&); + + mutable pthread_mutex_t m_mutex; + friend class Condition; +}; + +class RWMutex +{ +public: + RWMutex(); + ~RWMutex(); + + void ReadLock()const; + void WriteLock()const; + bool TryReadLock()const; + bool TryReadLock(int timeout) const; + bool TryWriteLock()const; + bool TryWriteLock(int timeout)const; + void Unlock()const; + + ThreadId GetOwner()const; + +private: + RWMutex(const RWMutex&); + const RWMutex& operator=(const RWMutex&); + + mutable pthread_rwlock_t m_mutex; + friend class Condition; +}; + +class RecursiveMutex +{ +public: + RecursiveMutex(); + ~RecursiveMutex(); + + bool Lock()const; + bool Unlock()const; + bool TryLock()const; + + ThreadId GetOwner()const; + + unsigned int GetCount()const + { + return m_count; + } + +private: + RecursiveMutex(const RecursiveMutex&); + + const RecursiveMutex& operator=(const RecursiveMutex&); + + bool lock(int count); + bool tryLock(); + bool unlock(); + + unsigned int reset4Condvar(); + +private: + pthread_mutex_t m_mutex; + Mutex m_internal; + mutable unsigned int m_count; + mutable ThreadId m_owner; + + friend class Condition; + friend class ConditionHelper; +}; +} +#endif http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/6a45c767/rocketmq-client4cpp/src/kpr/RefHandle.h ---------------------------------------------------------------------- diff --git a/rocketmq-client4cpp/src/kpr/RefHandle.h b/rocketmq-client4cpp/src/kpr/RefHandle.h new file mode 100644 index 0000000..fd7d741 --- /dev/null +++ b/rocketmq-client4cpp/src/kpr/RefHandle.h @@ -0,0 +1,328 @@ +/** + * Copyright (C) 2013 kangliqiang ,[email protected] + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#ifndef __KPR_REFHANDLET_H__ +#define __KPR_REFHANDLET_H__ + +#include "KPRTypes.h" +#include "AtomicValue.h" +#include "Exception.h" + +namespace kpr +{ + +class RefCount +{ +public: + RefCount& operator=(const RefCount&) + { + return *this; + } + + void incRef() + { + m_refCount++; + } + + void decRef() + { + if (--m_refCount == 0 && !m_noDelete) + { + m_noDelete = true; + delete this; + } + } + + int getRef() const + { + return m_refCount.get(); + } + + void setNoDelete(bool b) + { + m_noDelete = b; + } + +protected: + RefCount() + : m_refCount(0), m_noDelete(false) + { + } + + RefCount(const RefCount&) + : m_refCount(0), m_noDelete(false) + { + } + + virtual ~RefCount() + { + } + +protected: + AtomicInteger m_refCount; + bool m_noDelete; +}; + + + +template <class T> +class RefHandleT +{ +public: + RefHandleT(T* p = 0) + { + m_ptr = p; + + if (m_ptr) + { + m_ptr->incRef(); + } + } + + template<typename Y> + RefHandleT(const RefHandleT<Y>& v) + { + m_ptr = v.m_ptr; + + if (m_ptr) + { + m_ptr->incRef(); + } + } + + RefHandleT(const RefHandleT& v) + { + m_ptr = v.m_ptr; + + if (m_ptr) + { + m_ptr->incRef(); + } + } + + ~RefHandleT() + { + if (m_ptr) + { + m_ptr->decRef(); + } + } + + RefHandleT<T>& operator=(T* p) + { + if (m_ptr != p) + { + if (p) + { + p->incRef(); + } + + T* ptr = m_ptr; + m_ptr = p; + + if (ptr) + { + ptr->decRef(); + } + } + + return *this; + } + + template<typename Y> + RefHandleT<T>& operator=(const RefHandleT<Y>& v) + { + if (m_ptr != v.m_ptr) + { + if (v.m_ptr) + { + v.m_ptr->incRef(); + } + + T* ptr = m_ptr; + m_ptr = v.m_ptr; + + if (ptr) + { + ptr->decRef(); + } + } + + return *this; + } + + RefHandleT<T>& operator=(const RefHandleT<T>& v) + { + if (m_ptr != v.m_ptr) + { + if (v.m_ptr) + { + v.m_ptr->incRef(); + } + + T* ptr = m_ptr; + m_ptr = v.m_ptr; + + if (ptr) + { + ptr->decRef(); + } + } + + return *this; + } + + T* operator->() const + { + if (!m_ptr) + { + THROW_EXCEPTION(RefHandleNullException, "autoptr null handle error", -1); + } + + return m_ptr; + } + + T& operator*() const + { + if (!m_ptr) + { + THROW_EXCEPTION(RefHandleNullException, "autoptr null handle error", -1); + } + + return *m_ptr; + } + + operator T* () const + { + return m_ptr; + } + + T* ptr() const + { + return m_ptr; + } + + T* retn() + { + T* p = m_ptr; + m_ptr = 0; + + return p; + } + + bool operator==(const RefHandleT<T>& v) const + { + return m_ptr == v.m_ptr; + } + + bool operator==(T* p) const + { + return m_ptr == p; + } + + bool operator!=(const RefHandleT<T>& v) const + { + return m_ptr != v.m_ptr; + } + + bool operator!=(T* p) const + { + return m_ptr != p; + } + + bool operator!() const + { + return m_ptr == 0; + } + + operator bool() const + { + return m_ptr != 0; + } + + void swap(RefHandleT& other) + { + std::swap(m_ptr, other._ptr); + } + + template<class Y> + static RefHandleT dynamicCast(const RefHandleT<Y>& r) + { + return RefHandleT(dynamic_cast<T*>(r._ptr)); + } + + template<class Y> + static RefHandleT dynamicCast(Y* p) + { + return RefHandleT(dynamic_cast<T*>(p)); + } + +public: + T* m_ptr; +}; + + +template<typename T, typename U> +inline bool operator==(const RefHandleT<T>& lhs, const RefHandleT<U>& rhs) +{ + T* l = lhs.ptr(); + U* r = rhs.ptr(); + if(l && r) + { + return *l == *r; + } + else + { + return !l && !r; + } +} + + +template<typename T, typename U> +inline bool operator!=(const RefHandleT<T>& lhs, const RefHandleT<U>& rhs) +{ + T* l = lhs.ptr(); + U* r = rhs.ptr(); + if(l && r) + { + return *l != *r; + } + else + { + return l || r; + } +} + + +template<typename T, typename U> +inline bool operator<(const RefHandleT<T>& lhs, const RefHandleT<U>& rhs) +{ + T* l = lhs.ptr(); + U* r = rhs.ptr(); + if(l && r) + { + return *l < *r; + } + else + { + return !l && r; + } +} + +} + +#define DECLAREVAR(T) typedef kpr::RefHandleT<T> T ## Ptr; + +#endif http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/6a45c767/rocketmq-client4cpp/src/kpr/ScopedLock.h ---------------------------------------------------------------------- diff --git a/rocketmq-client4cpp/src/kpr/ScopedLock.h b/rocketmq-client4cpp/src/kpr/ScopedLock.h new file mode 100755 index 0000000..6ff9dd1 --- /dev/null +++ b/rocketmq-client4cpp/src/kpr/ScopedLock.h @@ -0,0 +1,91 @@ +/** + * Copyright (C) 2013 kangliqiang ,[email protected] + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#ifndef __KPR_SCOPEDLOCK_H__ +#define __KPR_SCOPEDLOCK_H__ + +namespace kpr +{ +template <class T> +class ScopedLock +{ +public: + ScopedLock(const T& mutex) + : m_mutex(mutex) + { + m_mutex.Lock(); + } + + ~ScopedLock() + { + m_mutex.Unlock(); + } + +private: + const T& m_mutex; +}; + + +template <class T> +class ScopedRLock +{ +public: + ScopedRLock(const T& mutex) + : m_mutex(mutex) + { + m_mutex.ReadLock(); + m_acquired = true; + } + + ~ScopedRLock() + { + if (m_acquired) + { + m_mutex.Unlock(); + } + } + +private: + const T& m_mutex; + mutable bool m_acquired; +}; + + +template <class T> +class ScopedWLock +{ +public: + ScopedWLock(const T& mutex) + : m_mutex(mutex) + { + m_mutex.WriteLock(); + m_acquired = true; + } + + ~ScopedWLock() + { + if (m_acquired) + { + m_mutex.Unlock(); + } + } + +private: + const T& m_mutex; + mutable bool m_acquired; +}; +} + +#endif http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/6a45c767/rocketmq-client4cpp/src/kpr/Semaphore.cpp ---------------------------------------------------------------------- diff --git a/rocketmq-client4cpp/src/kpr/Semaphore.cpp b/rocketmq-client4cpp/src/kpr/Semaphore.cpp new file mode 100755 index 0000000..59a0eef --- /dev/null +++ b/rocketmq-client4cpp/src/kpr/Semaphore.cpp @@ -0,0 +1,73 @@ +/** + * Copyright (C) 2013 kangliqiang ,[email protected] + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "Semaphore.h" + +#include <unistd.h> +#include <sys/time.h> +#include "KPRUtil.h" + +namespace kpr +{ +Semaphore::Semaphore(long initial_count) +{ + sem_init(&m_sem, 0, initial_count); +} + +Semaphore::~Semaphore() +{ + sem_destroy(&m_sem); +} + +int Semaphore::GetValue() +{ + int value = 0; + int rc = sem_getvalue(&m_sem, &value); + if (rc < 0) + { + return rc; + } + return value; +} + +bool Semaphore::Wait() +{ + int rc; + rc = sem_wait(&m_sem); + return !rc; +} + +bool Semaphore::Wait(long timeout) +{ + int rc; + if (timeout < 0) + { + rc = sem_wait(&m_sem); + } + else + { + struct timespec abstime = KPRUtil::CalcAbsTime(timeout); + rc = sem_timedwait(&m_sem, &abstime); + } + + return !rc; +} + +void Semaphore::Release(int count) +{ + sem_post(&m_sem); +} +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/6a45c767/rocketmq-client4cpp/src/kpr/Semaphore.h ---------------------------------------------------------------------- diff --git a/rocketmq-client4cpp/src/kpr/Semaphore.h b/rocketmq-client4cpp/src/kpr/Semaphore.h new file mode 100755 index 0000000..2a1af7f --- /dev/null +++ b/rocketmq-client4cpp/src/kpr/Semaphore.h @@ -0,0 +1,42 @@ +/** + * Copyright (C) 2013 kangliqiang ,[email protected] + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#ifndef __KPR_SEMAPHORE_H__ +#define __KPR_SEMAPHORE_H__ + +#include "KPRTypes.h" +#include <errno.h> + +namespace kpr +{ + +class Semaphore +{ +public: + Semaphore(long initial_count = 0); + ~Semaphore(); + + int GetValue(); + bool Wait(); + bool Wait(long timeout); + + void Release(int count = 1); + +private: + sem_t m_sem; +}; +} + +#endif http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/6a45c767/rocketmq-client4cpp/src/kpr/Thread.cpp ---------------------------------------------------------------------- diff --git a/rocketmq-client4cpp/src/kpr/Thread.cpp b/rocketmq-client4cpp/src/kpr/Thread.cpp new file mode 100755 index 0000000..d80819b --- /dev/null +++ b/rocketmq-client4cpp/src/kpr/Thread.cpp @@ -0,0 +1,191 @@ +/** + * Copyright (C) 2013 kangliqiang ,[email protected] + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#include "Thread.h" + +#include <string.h> +#include <stdlib.h> +#include <stdio.h> +#include <errno.h> +#include <assert.h> +#include <unistd.h> +#include <sys/types.h> +#include <signal.h> + +#include "ScopedLock.h" +#include "Exception.h" + +//for log +#include "RocketMQClient.h" + +namespace kpr +{ +kpr::AtomicInteger Thread::s_threadNumber = 0; + +void* Thread::ThreadRoute(void* pArg) +{ + Thread* tv = ((Thread*)pArg); + + try + { + tv->Startup(); + } + catch (...) + { + } + + try + { + tv->Cleanup(); + } + catch (...) + { + } + + return 0; +} + +Thread::Thread(const char* name) +{ + m_started = false; + m_threadId = ThreadId(); + m_threadNumber = s_threadNumber++; + + SetName(name); +} + +Thread::~Thread() +{ + try + { + } + catch (...) + { + } +} + +void Thread::SetName(const char* name) +{ + ScopedLock<Mutex> guard(m_mutex); + + if (name == NULL) + { + snprintf(m_name, sizeof(m_name), "Thread-%u", m_threadNumber); + } + else + { + snprintf(m_name, sizeof(m_name), "%s", name); + } +} + +const char* Thread::GetName() const +{ + ScopedLock<Mutex> guard(m_mutex); + return m_name; +} + +void Thread::Start() +{ + ScopedLock<Mutex> guard(m_mutex); + + if (m_started) + { + return; + } + + pthread_attr_t attr; + int retcode = 0; + retcode = pthread_attr_init(&attr); + if (retcode != 0) + { + THROW_EXCEPTION(SystemCallException, "pthread_attr_init failed!", errno) + } + + pthread_t id; + retcode = pthread_create(&id, &attr, ThreadRoute, (void*)this); + if (retcode != 0) + { + THROW_EXCEPTION(SystemCallException, "pthread_create error", errno) + } + + m_threadId = id; + pthread_attr_destroy(&attr); + m_started = true; + RMQ_DEBUG("thread[%s][%ld] start successfully", m_name, (long)id); +} + +void Thread::Run() +{ + //TODO support runable +} + +bool Thread::IsAlive() const +{ + if (m_started) + { + int retcode = pthread_kill(m_threadId, 0); + return (retcode == ESRCH); + } + + return false; +} + +void Thread::Join() +{ + if (m_started) + { + pthread_join(m_threadId, NULL); + } +} + +void Thread::Sleep(long millis, int nanos) +{ + assert(millis >= 0 && nanos >= 0 && nanos < 999999); + struct timespec tv; + tv.tv_sec = millis / 1000; + tv.tv_nsec = (millis % 1000) * 1000000 + nanos; + nanosleep(&tv, 0); +} + +void Thread::Yield() +{ + pthread_yield(); +} + +ThreadId Thread::GetId() const +{ + ScopedLock<Mutex> guard(m_mutex); + return m_threadId; +} + +void Thread::Startup() +{ + try + { + RMQ_INFO("thread[%s] started", GetName()); + Run(); + } + catch (...) + { + } +} + +void Thread::Cleanup() +{ + RMQ_INFO("thread[%s] end", GetName()); +} + +} + http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/6a45c767/rocketmq-client4cpp/src/kpr/Thread.h ---------------------------------------------------------------------- diff --git a/rocketmq-client4cpp/src/kpr/Thread.h b/rocketmq-client4cpp/src/kpr/Thread.h new file mode 100755 index 0000000..ef2590e --- /dev/null +++ b/rocketmq-client4cpp/src/kpr/Thread.h @@ -0,0 +1,68 @@ +/** + * Copyright (C) 2013 kangliqiang ,[email protected] + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#ifndef __KPR_THREAD_H__ +#define __KPR_THREAD_H__ + +#include "KPRTypes.h" +#include "RefHandle.h" +#include "Mutex.h" + +#ifdef Yield +#undef Yield +#endif + +namespace kpr +{ +class Thread : public virtual kpr::RefCount +{ +public: + Thread(const char* name = NULL); + virtual ~Thread(); + + virtual void Run(); + void Start(); + bool IsAlive() const; + void Join(); + ThreadId GetId() const; + + void SetName(const char*); + const char* GetName() const; + + void Startup(); + void Cleanup(); + + static void Sleep(long millis, int nano = 0); + static void Yield(); + +private: + Thread(const Thread&); + const Thread& operator=(const Thread&); + static void* ThreadRoute(void* pArg); + +private: + ThreadId m_threadId; + unsigned int m_threadNumber; + char m_name[128]; + bool m_started; + Mutex m_mutex; + + static kpr::AtomicInteger s_threadNumber; +}; +typedef kpr::RefHandleT<Thread> ThreadPtr; + +} + +#endif http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/6a45c767/rocketmq-client4cpp/src/kpr/ThreadLocal.cpp ---------------------------------------------------------------------- diff --git a/rocketmq-client4cpp/src/kpr/ThreadLocal.cpp b/rocketmq-client4cpp/src/kpr/ThreadLocal.cpp new file mode 100755 index 0000000..32cba5b --- /dev/null +++ b/rocketmq-client4cpp/src/kpr/ThreadLocal.cpp @@ -0,0 +1,56 @@ +/** + * Copyright (C) 2013 kangliqiang ,[email protected] + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#include "ThreadLocal.h" + +#include <errno.h> + +#include "Exception.h" + +namespace kpr +{ +ThreadLocal::ThreadLocal() + : m_Key(0) +{ + int retcode = 0; + + retcode = pthread_key_create(&m_Key, 0); + if (retcode != 0) + { + THROW_EXCEPTION(SystemCallException, "pthread_key_create error", errno); + } +} + +ThreadLocal::~ThreadLocal() +{ + pthread_key_delete(m_Key); +} + +void* ThreadLocal::GetValue() +{ + void* v; + v = pthread_getspecific(m_Key); + return v; +} + +void ThreadLocal::SetValue(void* value) +{ + int retcode = pthread_setspecific(m_Key, value); + if (retcode != 0) + { + THROW_EXCEPTION(SystemCallException, "pthread_setspecific error", errno); + } +} +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/6a45c767/rocketmq-client4cpp/src/kpr/ThreadLocal.h ---------------------------------------------------------------------- diff --git a/rocketmq-client4cpp/src/kpr/ThreadLocal.h b/rocketmq-client4cpp/src/kpr/ThreadLocal.h new file mode 100644 index 0000000..9ec8f43 --- /dev/null +++ b/rocketmq-client4cpp/src/kpr/ThreadLocal.h @@ -0,0 +1,37 @@ +/** + * Copyright (C) 2013 kangliqiang ,[email protected] + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#ifndef __KPR_THREADLOCAL_H__ +#define __KPR_THREADLOCAL_H__ + +#include "KPRTypes.h" + +namespace kpr +{ +class ThreadLocal +{ +public: + ThreadLocal(); + virtual ~ThreadLocal(); + + void* GetValue(); + void SetValue(void* value); + +private: + ThreadKey m_Key; +}; +}; + +#endif http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/6a45c767/rocketmq-client4cpp/src/kpr/ThreadPool.cpp ---------------------------------------------------------------------- diff --git a/rocketmq-client4cpp/src/kpr/ThreadPool.cpp b/rocketmq-client4cpp/src/kpr/ThreadPool.cpp new file mode 100755 index 0000000..32557a8 --- /dev/null +++ b/rocketmq-client4cpp/src/kpr/ThreadPool.cpp @@ -0,0 +1,418 @@ +/** + * Copyright (C) 2013 kangliqiang ,[email protected] + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#include "ThreadPool.h" + +#include "RocketMQClient.h" +#include "ScopedLock.h" +#include "KPRUtil.h" + +namespace kpr +{ +ThreadPoolWorker:: ThreadPoolWorker(ThreadPool* pThreadPool, const char* strName) + : kpr::Thread(strName), + m_pThreadPool(pThreadPool), + m_canWork(false), + m_isWaiting(false), + m_stop(false), + m_idleTime(0), + m_idle(true) +{ + +} + +bool ThreadPoolWorker::IsIdle() +{ + kpr::ScopedLock<kpr::Monitor> lock(*this); + return m_idle; +} + +void ThreadPoolWorker:: SetIdle(bool idle) +{ + kpr::ScopedLock<kpr::Monitor> lock(*this); + m_idle = idle; + m_idleTime = 0; +} + +int ThreadPoolWorker::IdleTime(int idleTime) +{ + if (m_idle) + { + m_idleTime += idleTime; + } + else + { + m_idleTime = 0; + } + + return m_idleTime; +} + +void ThreadPoolWorker::Run() +{ + while (!m_stop) + { + SetIdle(true); + { + kpr::ScopedLock<kpr::Monitor> lock(*this); + while (!m_canWork) + { + try + { + m_isWaiting = true; + Wait(); + m_isWaiting = false; + } + catch (...) + { + } + } + + m_canWork = false; + } + + while (!m_stop) + { + ThreadPoolWorkPtr request = m_pThreadPool->GetWork(this); + if ((ThreadPoolWork*)(NULL) == request) + { + break; + } + + SetIdle(false); + + try + { + request->Do(); + } + catch(...) + { + RMQ_ERROR("thead[%s] doWork exception", GetName()); + } + + //delete request; + request = NULL; + } + + if (m_stop || m_pThreadPool->IsDestroy()) + { + break; + } + } + + m_pThreadPool ->RemoveThread(this); + m_pThreadPool = NULL; +} + +void ThreadPoolWorker::WakeUp() +{ + SetIdle(false); + kpr::ScopedLock<kpr::Monitor> lock(*this); + m_canWork = true; + Notify(); +} + +void ThreadPoolWorker::Stop() +{ + kpr::ScopedLock<kpr::Monitor> lock(*this); + m_canWork = true; + m_stop = true; + Notify(); +} + +bool ThreadPoolWorker:: IsWaiting() +{ + kpr::ScopedLock<kpr::Monitor> lock(*this); + return m_isWaiting; +} + +ThreadPool::ThreadPool(const char* name, + int count, + int minCount, + int maxCount, + int step, + int maxIdleTime, + int checkldleThreadsInterval) +{ + if (name == NULL) + { + snprintf(m_name, sizeof(m_name), "ThreadPool"); + } + else + { + snprintf(m_name, sizeof(m_name), "%s", name); + } + + m_destroy = false; + m_minCount = minCount; + m_maxCount = maxCount; + m_maxIdleTime = maxIdleTime; + m_count = 0; + m_step = step; + m_index = 0; + + m_lastRemoveIdleThreadsTime = KPRUtil::GetCurrentTimeMillis(); + + if (m_minCount <= 0) + { + m_minCount = MIN_THREAD_COUNT; + } + + if (m_maxCount < 0) + { + m_maxCount = MAX_THREAD_COUNT; + } + + if (m_maxIdleTime < 0) + { + m_maxIdleTime = MAX_IDLE_THREAD_TIME; + } + + if (m_maxCount != 0 && m_maxCount < m_minCount) + { + m_minCount = MIN_THREAD_COUNT; + } + + if ((m_maxCount != 0 && count > m_maxCount) || count < m_minCount) + { + count = m_minCount; + } + + if (checkldleThreadsInterval < 0) + { + checkldleThreadsInterval = CHECK_IDLE_THREADS_INTERVAL; + } + + AddThreads(count); + + char manager_name[32]; + snprintf(manager_name, sizeof(manager_name), "%s-manager", m_name); + m_manager = new ThreadPoolManage(manager_name, this, checkldleThreadsInterval); + m_manager->Start(); +} + +ThreadPool::~ThreadPool() +{ + Destroy(); +} + +void ThreadPool::AddThreads(int count) +{ + char threadName[256]; + + for (int i = 0; i < count; ++i) + { + snprintf(threadName, sizeof(threadName), "%s-Worker%d", m_name, m_index); + + try + { + ThreadPoolWorkerPtr worker = new ThreadPoolWorker(this, threadName); + worker->Start(); + + m_workers.push_back(worker); + while (!worker->IsWaiting()) + { + kpr::Thread::Sleep(0, 100000); + } + + m_index++; + m_count++; + } + catch (...) + { + RMQ_ERROR("ThreadPool thead[%s] new exception", threadName); + } + } +} + +void ThreadPool::Destroy() +{ + std::list<ThreadPoolWorkerPtr> workers; + { + kpr::ScopedLock<kpr::Monitor> lock(*this); + if (m_destroy) + { + return; + } + + m_destroy = true; + + std::list<ThreadPoolWorkerPtr>::iterator iter; + for (iter = m_workers.begin(); iter != m_workers.end(); iter++) + { + workers.push_back(*iter); + (*iter)->Stop(); + } + } + + m_manager->Stop(); + m_manager->Join(); + + std::list<ThreadPoolWorkerPtr>::iterator itThread; + for (itThread = workers.begin(); itThread != workers.end(); itThread++) + { + (*itThread)->Join(); + } + m_works.clear(); +} + +int ThreadPool::AddWork(ThreadPoolWorkPtr pWork) +{ + kpr::ScopedLock<kpr::Monitor> lock(*this); + if (m_destroy) + { + return -1; + } + + m_works.push_back(pWork); + + if (!WakeOneThread()) + { + if (0 == m_maxCount || m_count < m_maxCount) + { + int step = m_step; + + if (0 < m_maxCount && m_count + m_step > m_maxCount) + { + step = m_maxCount - m_count; + } + + AddThreads(step); + WakeOneThread(); + } + } + + return 0; +} + +ThreadPoolWorkPtr ThreadPool::GetWork(ThreadPoolWorker* pWorker) +{ + kpr::ScopedLock<kpr::Monitor> lock(*this); + ThreadPoolWorkPtr result = NULL; + + if (!m_destroy && !m_works.empty()) + { + result = m_works.front(); + m_works.pop_front(); + } + + return result; +} + +bool ThreadPool::IsDestroy() +{ + kpr::ScopedLock<kpr::Monitor> lock(*this); + return m_destroy; +} + +void ThreadPool::RemoveThread(ThreadPoolWorker* workerThread) +{ + kpr::ScopedLock<kpr::Monitor> lock(*this); + + std::list<ThreadPoolWorkerPtr>::iterator it = m_workers.begin(); + + for (; it != m_workers.end(); it++) + { + if ((*it) == workerThread) + { + m_workers.erase(it); + m_count--; + break; + } + } +} + +void ThreadPool::RemoveIdleThreads() +{ + kpr::ScopedLock<kpr::Monitor> lock(*this); + + if (m_maxIdleTime == 0) + { + return; + } + + unsigned long long time = KPRUtil::GetCurrentTimeMillis(); + int interval = (int)(time - m_lastRemoveIdleThreadsTime); + m_lastRemoveIdleThreadsTime = time; + + std::list<ThreadPoolWorkerPtr>::iterator it = m_workers.begin(); + int size = (int)m_workers.size(); + while (size > m_minCount && it != m_workers.end()) + { + if ((*it)->IdleTime(interval) > m_maxIdleTime) + { + (*it)->Stop(); + size--; + } + + it++; + } +} + +bool ThreadPool::WakeOneThread() +{ + std::list<ThreadPoolWorkerPtr>::iterator it = m_workers.begin(); + for (; it != m_workers.end(); it++) + { + if ((*it)->IsIdle()) + { + (*it)->WakeUp(); + return true; + } + } + + return false; +} + +ThreadPoolManage::ThreadPoolManage(const char* name, ThreadPool* pThreadPool, int checkldleThreadsInterval) + : kpr::Thread(name), + m_pThreadPool(pThreadPool), + m_stop(false), + m_checkIdleThreadsInterval(checkldleThreadsInterval) +{ +} + +ThreadPoolManage::~ThreadPoolManage() +{ +} + +void ThreadPoolManage::Stop() +{ + kpr::ScopedLock<kpr::Monitor> lock(*this); + m_stop = true; + Notify(); +} + +void ThreadPoolManage::Run() +{ + while (!m_stop) + { + { + kpr::ScopedLock<kpr::Monitor> lock(*this); + if (!m_stop) + { + Wait(m_checkIdleThreadsInterval); + } + + if (m_stop) + { + break; + } + } + + m_pThreadPool->RemoveIdleThreads(); + } +} +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/6a45c767/rocketmq-client4cpp/src/kpr/ThreadPool.h ---------------------------------------------------------------------- diff --git a/rocketmq-client4cpp/src/kpr/ThreadPool.h b/rocketmq-client4cpp/src/kpr/ThreadPool.h new file mode 100755 index 0000000..2c7e3ff --- /dev/null +++ b/rocketmq-client4cpp/src/kpr/ThreadPool.h @@ -0,0 +1,124 @@ +/** + * Copyright (C) 2013 kangliqiang ,[email protected] + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#ifndef __KPR_THREADPOOL_H__ +#define __KPR_THREADPOOL_H__ + +#include<time.h> +#include <assert.h> +#include <list> +#include "Mutex.h" +#include "Thread.h" +#include "Monitor.h" + +#include "ThreadPoolWork.h" + +namespace kpr +{ +const int MAX_THREAD_COUNT = 300; +const int MIN_THREAD_COUNT = 1; +//const int MAX_IDLE_THREAD_TIME = 600000; +const int MAX_IDLE_THREAD_TIME = 0; +const int THREAD_STEP = 10; +const int CHECK_IDLE_THREADS_INTERVAL = 30000; + +class ThreadPool; +class ThreadPoolWorker : public kpr::Thread, public kpr::Monitor +{ +public: + ThreadPoolWorker(ThreadPool* pThreadPool, const char* strName); + + virtual void Run(); + void WakeUp(); + void Stop(); + bool IsWaiting(); + bool IsIdle(); + void SetIdle(bool idle); + int IdleTime(int idleTime); + +private: + ThreadPool* m_pThreadPool; + bool m_canWork; + bool m_isWaiting; + bool m_stop; + int m_idleTime; + bool m_idle; +}; +typedef kpr::RefHandleT<ThreadPoolWorker> ThreadPoolWorkerPtr; + +class ThreadPoolManage : public kpr::Thread, public kpr::Monitor +{ +public: + ThreadPoolManage(const char* name, ThreadPool* pThreadPool, int nCheckldleThreadsInterval); + + ~ThreadPoolManage(); + virtual void Run(); + void Stop(); + +private: + ThreadPool* m_pThreadPool; + bool m_stop; + int m_checkIdleThreadsInterval; +}; +typedef kpr::RefHandleT<ThreadPoolManage> ThreadPoolManagePtr; + + +class ThreadPool : public kpr::RefCount, public kpr::Monitor +{ +public: + ThreadPool(const char* name, + int initCount, + int minCount, + int maxCount, + int step = THREAD_STEP, + int maxIdleTime = MAX_IDLE_THREAD_TIME, + int checkldleThreadsInterval = CHECK_IDLE_THREADS_INTERVAL); + + ~ThreadPool(); + void Destroy(); + + int AddWork(ThreadPoolWorkPtr pWork); + ThreadPoolWorkPtr GetWork(ThreadPoolWorker* pWorker); + + void RemoveIdleThreads(); + void RemoveThread(ThreadPoolWorker* pWorker); + + bool WakeOneThread(); + bool IsDestroy(); + +private: + void AddThreads(int count); + +private: + bool m_destroy; + int m_minCount; + int m_maxCount; + int m_maxIdleTime; + int m_count; + int m_step; + + char m_name[128]; + unsigned int m_index; + unsigned long long m_lastRemoveIdleThreadsTime; + + ThreadPoolManagePtr m_manager; + std::list<ThreadPoolWorkPtr> m_works; + std::list<ThreadPoolWorkerPtr> m_workers; +}; + +typedef kpr::RefHandleT<ThreadPool> ThreadPoolPtr; + +} +#endif http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/6a45c767/rocketmq-client4cpp/src/kpr/ThreadPoolWork.h ---------------------------------------------------------------------- diff --git a/rocketmq-client4cpp/src/kpr/ThreadPoolWork.h b/rocketmq-client4cpp/src/kpr/ThreadPoolWork.h new file mode 100644 index 0000000..30dfe6c --- /dev/null +++ b/rocketmq-client4cpp/src/kpr/ThreadPoolWork.h @@ -0,0 +1,34 @@ +/** +* Copyright (C) 2013 kangliqiang ,[email protected] +* +* Licensed under the Apache License, Version 2.0 (the "License"); +* you may not use this file except in compliance with the License. +* You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +*/ +#ifndef __THREADPOOLWORK_H__ +#define __THREADPOOLWORK_H__ + +#include "RefHandle.h" + +namespace kpr +{ + +class ThreadPoolWork : public kpr::RefCount +{ +public: + virtual ~ThreadPoolWork() {} + virtual void Do() = 0; +}; +typedef kpr::RefHandleT<ThreadPoolWork> ThreadPoolWorkPtr; + +} + +#endif http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/6a45c767/rocketmq-client4cpp/src/kpr/TimerTaskManager.cpp ---------------------------------------------------------------------- diff --git a/rocketmq-client4cpp/src/kpr/TimerTaskManager.cpp b/rocketmq-client4cpp/src/kpr/TimerTaskManager.cpp new file mode 100755 index 0000000..42ef672 --- /dev/null +++ b/rocketmq-client4cpp/src/kpr/TimerTaskManager.cpp @@ -0,0 +1,91 @@ +/** +* Copyright (C) 2013 kangliqiang ,[email protected] +* +* Licensed under the Apache License, Version 2.0 (the "License"); +* you may not use this file except in compliance with the License. +* You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +*/ +#include "TimerTaskManager.h" +#include "ThreadPool.h" +#include "ScopedLock.h" + +namespace kpr +{ +TimerTaskManager::TimerTaskManager() +{ +} + +TimerTaskManager::~TimerTaskManager() +{ +} + +int TimerTaskManager::Init(int maxThreadCount, int checklnteval) +{ + try + { + m_pThreadPool = new ThreadPool("TimerThreadPool", 5, 5, maxThreadCount); + m_timerThread = new TimerThread("TimerThread", checklnteval); + m_timerThread->Start(); + } + catch (...) + { + return -1; + } + + return 0; +} + +unsigned int TimerTaskManager::RegisterTimer(unsigned int initialDelay, unsigned int elapse, TimerTaskPtr pTask) +{ + unsigned int id = m_timerThread->RegisterTimer(initialDelay, elapse, this, true); + + kpr::ScopedLock<kpr::Mutex> lock(m_mutex); + m_timerTasks[id] = pTask; + + return id; +} + +bool TimerTaskManager::UnRegisterTimer(unsigned int timerId) +{ + bool ret = m_timerThread->UnRegisterTimer(timerId); + + kpr::ScopedLock<kpr::Mutex> lock(m_mutex); + m_timerTasks.erase(timerId); + + return ret; +} + +bool TimerTaskManager::ResetTimer(unsigned int timerId) +{ + return m_timerThread->ResetTimer(timerId); +} + +void TimerTaskManager::OnTimeOut(unsigned int timerId) +{ + kpr::ScopedLock<kpr::Mutex> lock(m_mutex); + std::map<unsigned int, TimerTaskPtr>::iterator it = m_timerTasks.find(timerId); + if (it != m_timerTasks.end()) + { + if (!it->second->IsProcessing()) + { + it->second->SetProcessing(true); + m_pThreadPool->AddWork((it->second).ptr()); + } + } +} + +void TimerTaskManager::Stop() +{ + m_timerThread->Stop(); + m_timerThread->Join(); + m_pThreadPool->Destroy(); +} +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/6a45c767/rocketmq-client4cpp/src/kpr/TimerTaskManager.h ---------------------------------------------------------------------- diff --git a/rocketmq-client4cpp/src/kpr/TimerTaskManager.h b/rocketmq-client4cpp/src/kpr/TimerTaskManager.h new file mode 100755 index 0000000..b9cc2e0 --- /dev/null +++ b/rocketmq-client4cpp/src/kpr/TimerTaskManager.h @@ -0,0 +1,95 @@ +/** +* Copyright (C) 2013 kangliqiang ,[email protected] +* +* Licensed under the Apache License, Version 2.0 (the "License"); +* you may not use this file except in compliance with the License. +* You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +*/ +#ifndef __KPR_TIMERTASKMANAGER_H__ +#define __KPR_TIMERTASKMANAGER_H__ + +#include <list> +#include <map> + +#include "RocketMQClient.h" +#include "TimerThread.h" +#include "ThreadPool.h" +#include "ThreadPoolWork.h" + +namespace kpr +{ + +class TimerTask : public kpr::ThreadPoolWork +{ +public: + TimerTask() + : m_isProcessing(false) + { + } + + virtual ~TimerTask() + { + } + + virtual void Do() + { + try + { + DoTask(); + } + catch(...) + { + RMQ_ERROR("TimerTask exception"); + } + m_isProcessing = false; + } + + bool IsProcessing() + { + return m_isProcessing; + } + + void SetProcessing(bool isProcessing) + { + m_isProcessing = isProcessing; + } + + virtual void DoTask() = 0; + +private: + bool m_isProcessing; +}; +typedef kpr::RefHandleT<TimerTask> TimerTaskPtr; + + +class TimerTaskManager : public TimerHandler +{ +public: + TimerTaskManager(); + virtual ~TimerTaskManager(); + + int Init(int maxThreadCount, int checklnteval); + unsigned int RegisterTimer(unsigned int initialDelay, unsigned int elapse, TimerTaskPtr pTask); + bool UnRegisterTimer(unsigned int timerId); + bool ResetTimer(unsigned int timerId); + void Stop(); + + virtual void OnTimeOut(unsigned int timerId); + +private: + std::map<unsigned int, TimerTaskPtr> m_timerTasks; + kpr::Mutex m_mutex; + TimerThreadPtr m_timerThread; + kpr::ThreadPoolPtr m_pThreadPool; +}; + +} +#endif http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/6a45c767/rocketmq-client4cpp/src/kpr/TimerThread.cpp ---------------------------------------------------------------------- diff --git a/rocketmq-client4cpp/src/kpr/TimerThread.cpp b/rocketmq-client4cpp/src/kpr/TimerThread.cpp new file mode 100755 index 0000000..b127074 --- /dev/null +++ b/rocketmq-client4cpp/src/kpr/TimerThread.cpp @@ -0,0 +1,186 @@ +/** +* Copyright (C) 2013 kangliqiang ,[email protected] +* +* Licensed under the Apache License, Version 2.0 (the "License"); +* you may not use this file except in compliance with the License. +* You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +*/ +#include "TimerThread.h" +#include "KPRUtil.h" +#include "ScopedLock.h" + +namespace kpr +{ +unsigned int TimerThread::s_nextTimerID = 0; + +TimerThread::TimerThread(const char* name, unsigned int checklnterval) + : kpr::Thread(name), m_closed(false), m_checkInterval(checklnterval) +{ +} + +TimerThread::~TimerThread() +{ +} + +void TimerThread::Run() +{ + unsigned long long lastCheckTime = KPRUtil::GetCurrentTimeMillis(); + unsigned long long currentCheckTime = lastCheckTime; + + while (!m_closed) + { + currentCheckTime = KPRUtil::GetCurrentTimeMillis(); + unsigned int elapse = (unsigned int)(currentCheckTime - lastCheckTime); + + std::list<TimerInfo> timeList; + + CheckTimeOut(elapse, timeList); + + if (!timeList.empty()) + { + std::list<TimerInfo>::iterator it = timeList.begin(); + for (; it != timeList.end(); it++) + { + try + { + it->pTimerHandler->OnTimeOut(it->id); + } + catch(...) + { + RMQ_ERROR("TimerThread[%s] OnTimeOut exception", GetName()); + } + } + } + + unsigned long long checkEndTime = KPRUtil::GetCurrentTimeMillis(); + int sleepTime = m_checkInterval - (int)(checkEndTime - currentCheckTime); + if (sleepTime < 0) + { + sleepTime = 0; + } + + lastCheckTime = currentCheckTime; + + try + { + kpr::ScopedLock<kpr::Monitor> lock(*this); + Wait(sleepTime); + } + catch (...) + { + } + } +} + +void TimerThread::Stop() +{ + m_closed = true; + kpr::ScopedLock<kpr::Monitor> lock(*this); + Notify(); +} + +unsigned int TimerThread::RegisterTimer(unsigned int initialDelay, unsigned int elapse, TimerHandler* pHandler, bool persistent) +{ + TimerInfo info; + info.elapse = elapse; + info.outTime = elapse - initialDelay; + info.pTimerHandler = pHandler; + info.persistent = persistent; + + kpr::ScopedLock<kpr::Mutex> lock(m_mutex); + info.id = GetNextTimerID(); + m_timers[info.id] = info; + + return info.id; +} + +bool TimerThread::UnRegisterTimer(unsigned int timerId) +{ + bool result = false; + kpr::ScopedLock<kpr::Mutex> lock(m_mutex); + std::map<unsigned int, TimerInfo>::iterator it = m_timers.find(timerId); + if (it != m_timers.end()) + { + m_timers.erase(it); + result = true; + } + + return result; +} + +bool TimerThread::ResetTimer(unsigned int timerId) +{ + bool result = false; + kpr::ScopedLock<kpr::Mutex> lock(m_mutex); + std::map<unsigned int, TimerInfo>::iterator it = m_timers.find(timerId); + if (it != m_timers.end()) + { + if (it->second.persistent) + { + it->second.outTime = it->second.elapse; + } + else + { + it->second.outTime = 0; + } + + result = true; + } + + return result; +} + +bool TimerThread::CheckTimeOut(unsigned int elapse, std::list<TimerInfo>& timerList) +{ + bool result = false; + timerList.clear(); + + kpr::ScopedLock<kpr::Mutex> lock(m_mutex); + if (!m_timers.empty()) + { + std::map<unsigned int, TimerInfo>::iterator it = m_timers.begin(); + while (it != m_timers.end()) + { + it->second.outTime += elapse; + + if (it->second.outTime >= int(it->second.elapse)) + { + timerList.push_back(it->second); + + if (it->second.persistent) + { + it->second.outTime = 0; + ++it; + } + else + { + std::map<unsigned int, TimerInfo>::iterator it1 = it; + ++it; + m_timers.erase(it1); + } + } + else + { + ++it; + } + } + + result = true; + } + + return result; +} + +unsigned int TimerThread::GetNextTimerID() +{ + return ++s_nextTimerID; +} +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/6a45c767/rocketmq-client4cpp/src/kpr/TimerThread.h ---------------------------------------------------------------------- diff --git a/rocketmq-client4cpp/src/kpr/TimerThread.h b/rocketmq-client4cpp/src/kpr/TimerThread.h new file mode 100755 index 0000000..7e02a79 --- /dev/null +++ b/rocketmq-client4cpp/src/kpr/TimerThread.h @@ -0,0 +1,79 @@ +/** + * Copyright (C) 2013 kangliqiang ,[email protected] + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#ifndef __KPR_TIMERTHREAD_H__ +#define __KPR_TIMERTHREAD_H__ + +#include <list> +#include <map> + +#include "RocketMQClient.h" +#include "Thread.h" +#include "Mutex.h" +#include "Monitor.h" + +namespace kpr +{ +class TimerHandler +{ +public: + TimerHandler() + { + } + + virtual ~TimerHandler() + { + } + + virtual void OnTimeOut(unsigned int timerID) = 0; +}; + +typedef struct tagTimerlnfo +{ + unsigned int id; + unsigned int elapse; + int outTime; + bool persistent; + TimerHandler* pTimerHandler; +} TimerInfo; + + +class TimerThread : public kpr::Thread, public kpr::Monitor +{ +public: + TimerThread(const char* name, unsigned int checklnterval); + virtual ~TimerThread(); + virtual void Run(); + virtual void Stop(); + + virtual unsigned int RegisterTimer(unsigned int initialDelay, unsigned int elapse, TimerHandler* pHandler, bool persistent = true); + virtual bool UnRegisterTimer(unsigned int timerId); + virtual bool ResetTimer(unsigned int timerId); + +private: + bool CheckTimeOut(unsigned int elapse, std::list<TimerInfo>& timerList); + static unsigned int GetNextTimerID(); + +private: + static unsigned int s_nextTimerID; + std::map<unsigned int, TimerInfo> m_timers; + kpr::Mutex m_mutex; + bool m_closed; + unsigned int m_checkInterval; +}; +typedef kpr::RefHandleT<TimerThread> TimerThreadPtr; +} + +#endif http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/6a45c767/rocketmq-client4cpp/src/message/Message.cpp ---------------------------------------------------------------------- diff --git a/rocketmq-client4cpp/src/message/Message.cpp b/rocketmq-client4cpp/src/message/Message.cpp new file mode 100755 index 0000000..db88c3e --- /dev/null +++ b/rocketmq-client4cpp/src/message/Message.cpp @@ -0,0 +1,379 @@ +/** + * Copyright (C) 2013 kangliqiang ,[email protected] + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#include "Message.h" + +#include <string.h> +#include <stdlib.h> +#include <stdio.h> +#include "UtilAll.h" + + +namespace rmq +{ + +const std::string Message::PROPERTY_KEYS = "KEYS"; +const std::string Message::PROPERTY_TAGS = "TAGS"; +const std::string Message::PROPERTY_WAIT_STORE_MSG_OK = "WAIT"; +const std::string Message::PROPERTY_DELAY_TIME_LEVEL = "DELAY"; +const std::string Message::PROPERTY_RETRY_TOPIC = "RETRY_TOPIC"; +const std::string Message::PROPERTY_REAL_TOPIC = "REAL_TOPIC"; +const std::string Message::PROPERTY_REAL_QUEUE_ID = "REAL_QID"; +const std::string Message::PROPERTY_TRANSACTION_PREPARED = "TRAN_MSG"; +const std::string Message::PROPERTY_PRODUCER_GROUP = "PGROUP"; +const std::string Message::PROPERTY_MIN_OFFSET = "MIN_OFFSET"; +const std::string Message::PROPERTY_MAX_OFFSET = "MAX_OFFSET"; +const std::string Message::PROPERTY_BUYER_ID = "BUYER_ID"; +const std::string Message::PROPERTY_ORIGIN_MESSAGE_ID = "ORIGIN_MESSAGE_ID"; +const std::string Message::PROPERTY_TRANSFER_FLAG = "TRANSFER_FLAG"; +const std::string Message::PROPERTY_CORRECTION_FLAG = "CORRECTION_FLAG"; +const std::string Message::PROPERTY_MQ2_FLAG = "MQ2_FLAG"; +const std::string Message::PROPERTY_RECONSUME_TIME = "RECONSUME_TIME"; +const std::string Message::PROPERTY_MSG_REGION = "MSG_REGION"; +const std::string Message::PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX = "UNIQ_KEY"; +const std::string Message::PROPERTY_MAX_RECONSUME_TIMES = "MAX_RECONSUME_TIMES"; +const std::string Message::PROPERTY_CONSUME_START_TIMESTAMP = "CONSUME_START_TIME"; +const std::string Message::KEY_SEPARATOR = " "; + +Message::Message() +{ + Init("", "", "", 0, NULL, 0, true); +} + +Message::Message(const std::string& topic, const char* body, int len) +{ + Init(topic, "", "", 0, body, len, true); +} + +Message::Message(const std::string& topic, const std::string& tags, const char* body, int len) +{ + Init(topic, tags, "", 0, body, len, true); +} + +Message::Message(const std::string& topic, const std::string& tags, const std::string& keys, const char* body, int len) +{ + Init(topic, tags, keys, 0, body, len, true); +} + +Message::Message(const std::string& topic, + const std::string& tags, + const std::string& keys, + const int flag, + const char* body, + int len, + bool waitStoreMsgOK) +{ + Init(topic, tags, keys, flag, body, len, waitStoreMsgOK); +} + +Message::~Message() +{ + if (m_body) + { + free(m_body); + m_body = NULL; + m_bodyLen = 0; + } + + if (m_compressBody) + { + free(m_compressBody); + m_compressBody = NULL; + m_compressBodyLen = 0; + } +} + +Message::Message(const Message& other) +{ + m_body = (char*)malloc(other.m_bodyLen); + m_bodyLen = other.m_bodyLen; + memcpy(m_body, other.m_body, other.m_bodyLen); + + m_compressBody = NULL; + m_compressBodyLen = 0; + + m_topic = other.m_topic; + m_flag = other.m_flag; + m_properties = other.m_properties; +} + +Message& Message::operator=(const Message& other) +{ + if (this != &other) + { + if (m_body) + { + free(m_body); + m_body = NULL; + m_bodyLen = 0; + } + + if (m_compressBody) + { + free(m_compressBody); + m_compressBody = NULL; + m_compressBodyLen = 0; + } + + m_body = (char*)malloc(other.m_bodyLen);; + m_bodyLen = other.m_bodyLen; + memcpy(m_body, other.m_body, other.m_bodyLen); + + m_topic = other.m_topic; + m_flag = other.m_flag; + m_properties = other.m_properties; + } + + return *this; +} + +void Message::clearProperty(const std::string& name) +{ + m_properties.erase(name); +} + +void Message::putProperty(const std::string& name, const std::string& value) +{ + m_properties[name] = value; +} + +std::string Message::getProperty(const std::string& name) +{ + std::map<std::string, std::string>::const_iterator it = m_properties.find(name); + return (it == m_properties.end()) ? "" : it->second; +} + +std::string Message::getTopic()const +{ + return m_topic; +} + +void Message::setTopic(const std::string& topic) +{ + m_topic = topic; +} + +std::string Message::getTags() +{ + return getProperty(PROPERTY_TAGS); +} + +void Message::setTags(const std::string& tags) +{ + putProperty(PROPERTY_TAGS, tags); +} + +std::string Message::getKeys() +{ + return getProperty(PROPERTY_KEYS); +} + +void Message::setKeys(const std::string& keys) +{ + putProperty(PROPERTY_KEYS, keys); +} + +void Message::setKeys(const std::list<std::string> keys) +{ + if (keys.empty()) + { + return; + } + + std::list<std::string>::const_iterator it = keys.begin(); + std::string str; + str += *it; + it++; + + for (; it != keys.end(); it++) + { + str += KEY_SEPARATOR; + str += *it; + } + + setKeys(str); +} + +int Message::getDelayTimeLevel() +{ + std::string tmp = getProperty(PROPERTY_DELAY_TIME_LEVEL); + if (!tmp.empty()) + { + return atoi(tmp.c_str()); + } + + return 0; +} + +void Message::setDelayTimeLevel(int level) +{ + char tmp[16]; + snprintf(tmp, sizeof(tmp), "%d", level); + + putProperty(PROPERTY_DELAY_TIME_LEVEL, tmp); +} + +bool Message::isWaitStoreMsgOK() +{ + std::string tmp = getProperty(PROPERTY_WAIT_STORE_MSG_OK); + if (tmp.empty()) + { + return true; + } + else + { + return (tmp == "true") ? true : false; + } +} + +void Message::setWaitStoreMsgOK(bool waitStoreMsgOK) +{ + if (waitStoreMsgOK) + { + putProperty(PROPERTY_WAIT_STORE_MSG_OK, "true"); + } + else + { + putProperty(PROPERTY_WAIT_STORE_MSG_OK, "false"); + } +} + +int Message::getFlag() +{ + return m_flag; +} + +void Message::setFlag(int flag) +{ + m_flag = flag; +} + +const char* Message::getBody()const +{ + return m_body; +} + +int Message::getBodyLen()const +{ + return m_bodyLen; +} + +void Message::setBody(const char* body, int len) +{ + if (len > 0) + { + if (m_body) + { + free(m_body); + m_body = NULL; + m_bodyLen = 0; + } + + m_body = (char*)malloc(len); + m_bodyLen = len; + memcpy(m_body, body, len); + } +} + +bool Message::tryToCompress(int compressLevel) +{ + if (m_body != NULL) + { + if (m_compressBody) + { + free(m_compressBody); + m_compressBody = NULL; + m_compressBodyLen = 0; + } + + unsigned char* pOut; + int outLen = 0; + if (UtilAll::compress(m_body, m_bodyLen, &pOut, &outLen, compressLevel)) + { + m_compressBody = (char*)pOut; + m_compressBodyLen = outLen; + return true; + } + } + + return false; +} + + +const char* Message::getCompressBody() const +{ + return m_compressBody; +} + +int Message::getCompressBodyLen() const +{ + return m_compressBodyLen; +} + + + +std::map<std::string, std::string>& Message::getProperties() +{ + return m_properties; +} + +void Message::setProperties(const std::map<std::string, std::string>& properties) +{ + m_properties = properties; +} + +void Message::Init(const std::string& topic, const std::string& tags, const std::string& keys, const int flag, const char* body, int len, bool waitStoreMsgOK) +{ + m_topic = topic; + m_flag = flag; + + m_body = NULL; + m_bodyLen = len; + + m_compressBody = NULL; + m_compressBodyLen = 0; + + if (len > 0) + { + m_body = (char*)malloc(len); + memcpy(m_body, body, len); + } + + if (tags.length() > 0) + { + setTags(tags); + } + + if (keys.length() > 0) + { + setKeys(keys); + } + + setWaitStoreMsgOK(waitStoreMsgOK); +} + +std::string Message::toString() const +{ + std::stringstream ss; + ss << "{m_topic=" << m_topic + << ",m_flag=" << m_flag + << ",properties=" << UtilAll::toString(m_properties) + << ",m_bodyLen=" << m_bodyLen + << "}"; + return ss.str(); +} + + +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/6a45c767/rocketmq-client4cpp/src/message/MessageDecoder.cpp ---------------------------------------------------------------------- diff --git a/rocketmq-client4cpp/src/message/MessageDecoder.cpp b/rocketmq-client4cpp/src/message/MessageDecoder.cpp new file mode 100755 index 0000000..338121e --- /dev/null +++ b/rocketmq-client4cpp/src/message/MessageDecoder.cpp @@ -0,0 +1,366 @@ +/** +* Copyright (C) 2013 kangliqiang ,[email protected] +* +* Licensed under the Apache License, Version 2.0 (the "License"); +* you may not use this file except in compliance with the License. +* You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +*/ +#include "MessageDecoder.h" + +#include <string.h> +#include <stdio.h> +#include <stdlib.h> +#include <sstream> +#include "MessageExt.h" +#include "MessageSysFlag.h" +#include "UtilAll.h" + +namespace rmq +{ + +const char MessageDecoder::NAME_VALUE_SEPARATOR = 1; +const char MessageDecoder::PROPERTY_SEPARATOR = 2; +const int MessageDecoder::MSG_ID_LENGTH = 8 + 8; + +int MessageDecoder::MessageMagicCodePostion = 4; +int MessageDecoder::MessageFlagPostion = 16; +int MessageDecoder::MessagePhysicOffsetPostion = 28; +int MessageDecoder::MessageStoreTimestampPostion = 56; + +std::string MessageDecoder::createMessageId(sockaddr& addr, long long offset) +{ + struct sockaddr_in sa; + memcpy(&sa, &addr, sizeof(sockaddr)); + sa.sin_family = AF_INET; + + int port = ntohs(sa.sin_port); + port = htonl(port); + int ip = sa.sin_addr.s_addr; + + unsigned char* buf = new unsigned char[MSG_ID_LENGTH]; + offset = h2nll(offset); + memcpy(buf, &ip, 4); + memcpy(buf + 4, &port, 4); + memcpy(buf + 8, &offset, 8); + + char* str = new char[2 * MSG_ID_LENGTH + 1]; + memset(str, 0, 2 * MSG_ID_LENGTH + 1); + + for (int i = 0; i < MSG_ID_LENGTH; i++) + { + char tmp[3]; + tmp[2] = 0; + + snprintf(tmp, sizeof(tmp), "%02X", buf[i]); + strncat(str, tmp, sizeof(tmp)); + } + + std::string ret = str; + + delete[] buf; + delete[] str; + + return ret; +} + +MessageId MessageDecoder::decodeMessageId(const std::string& msgId) +{ + std::string ipstr = msgId.substr(0, 8); + std::string portstr = msgId.substr(8, 8); + std::string offsetstr = msgId.substr(16); + + char* end; + int ipint = strtoul(ipstr.c_str(), &end, 16); + int portint = strtoul(portstr.c_str(), &end, 16); + + long long offset = UtilAll::hexstr2ull(offsetstr.c_str()); + + offset = n2hll(offset); + + portint = ntohl(portint); + short port = portint; + + struct sockaddr_in sa; + sa.sin_family = AF_INET; + sa.sin_port = htons(port); + sa.sin_addr.s_addr = ipint; + + sockaddr addr; + memcpy(&addr, &sa, sizeof(sockaddr)); + + MessageId id(addr, offset); + + return id; +} + +MessageExt* MessageDecoder::decode(const char* pData, int len, int& offset) +{ + return decode(pData, len, offset, true); +} + +MessageExt* MessageDecoder::decode(const char* pData, int len, int& offset, bool readBody) +{ + MessageExt* msgExt = NULL; + + try + { + msgExt = new MessageExt(); + + // 1 TOTALSIZE + int storeSize; + memcpy(&storeSize, pData, 4); + storeSize = ntohl(storeSize); + + msgExt->setStoreSize(storeSize); + + // 2 MAGICCODE sizeof(int) + + // 3 BODYCRC + int bodyCRC; + memcpy(&bodyCRC, pData + 2 * sizeof(int), 4); + bodyCRC = ntohl(bodyCRC); + msgExt->setBodyCRC(bodyCRC); + + // 4 QUEUEID + int queueId; + memcpy(&queueId, pData + 3 * sizeof(int), 4); + queueId = ntohl(queueId); + msgExt->setQueueId(queueId); + + // 5 FLAG + int flag ; + + memcpy(&flag, pData + 4 * sizeof(int), 4); + flag = ntohl(flag); + + msgExt->setFlag(flag); + + // 6 QUEUEOFFSET + long long queueOffset; + memcpy(&queueOffset, pData + 5 * sizeof(int), 8); + queueOffset = n2hll(queueOffset); + msgExt->setQueueOffset(queueOffset); + + // 7 PHYSICALOFFSET + long long physicOffset; + + memcpy(&physicOffset, pData + 7 * sizeof(int), 8); + physicOffset = n2hll(physicOffset); + msgExt->setCommitLogOffset(physicOffset); + + // 8 SYSFLAG + int sysFlag; + + memcpy(&sysFlag, pData + 9 * sizeof(int), 4); + sysFlag = ntohl(sysFlag); + msgExt->setSysFlag(sysFlag); + + // 9 BORNTIMESTAMP + long long bornTimeStamp; + memcpy(&bornTimeStamp, pData + 10 * sizeof(int), 8); + bornTimeStamp = n2hll(bornTimeStamp); + + msgExt->setBornTimestamp(bornTimeStamp); + + // 10 BORNHOST + int bornHost;//c0 a8 00 68 192.168.0.104 c0 a8 00 68 00 00 c4 04 + memcpy(&bornHost, pData + 12 * sizeof(int), 4); + + int port; + memcpy(&port, pData + 13 * sizeof(int), 4); + port = ntohl(port); + + struct sockaddr_in sa; + sa.sin_family = AF_INET; + sa.sin_port = htons(port); + sa.sin_addr.s_addr = bornHost; + + sockaddr bornAddr; + memcpy(&bornAddr, &sa, sizeof(sockaddr)); + msgExt->setBornHost(bornAddr); + + // 11 STORETIMESTAMP + long long storeTimestamp; + memcpy(&storeTimestamp, pData + 14 * sizeof(int), 8); + storeTimestamp = n2hll(storeTimestamp); + msgExt->setStoreTimestamp(storeTimestamp); + + // 12 STOREHOST + int storeHost; + memcpy(&storeHost, pData + 16 * sizeof(int), 4); + memcpy(&port, pData + 17 * sizeof(int), 4); + port = ntohl(port); + + sa.sin_family = AF_INET; + sa.sin_port = htons(port); + sa.sin_addr.s_addr = storeHost; + + sockaddr storeAddr; + memcpy(&storeAddr, &sa, sizeof(sockaddr)); + + msgExt->setStoreHost(storeAddr); + + // 13 RECONSUMETIMES + int reconsumeTimes; + memcpy(&reconsumeTimes, pData + 18 * sizeof(int), 4); + reconsumeTimes = ntohl(reconsumeTimes); + msgExt->setReconsumeTimes(reconsumeTimes); + + // 14 Prepared Transaction Offset + long long preparedTransactionOffset; + memcpy(&preparedTransactionOffset, pData + 19 * sizeof(int), 8); + preparedTransactionOffset = n2hll(preparedTransactionOffset); + msgExt->setPreparedTransactionOffset(preparedTransactionOffset); + + // 15 BODY + int bodyLen = 0; + memcpy(&bodyLen, pData + 21 * sizeof(int), 4); + bodyLen = ntohl(bodyLen); + + if (bodyLen > 0) + { + if (readBody) + { + const char* body = pData + 22 * sizeof(int); + int newBodyLen = bodyLen; + + // uncompress body + if ((sysFlag & MessageSysFlag::CompressedFlag) == MessageSysFlag::CompressedFlag) + { + unsigned char* pOut; + int outLen; + + if (UtilAll::decompress(body, bodyLen, &pOut, &outLen)) + { + msgExt->setBody((char*)pOut, outLen); + free(pOut); + } + else + { + msgExt->setBody(body, newBodyLen); + } + } + else + { + msgExt->setBody(body, newBodyLen); + } + } + else + { + + } + } + + // 16 TOPIC + int topicLen = *(pData + 22 * sizeof(int) + bodyLen); + + char* tmp = new char[topicLen + 1]; + + memcpy(tmp, pData + 22 * sizeof(int) + bodyLen + 1, topicLen); + tmp[topicLen] = 0; + std::string topic = tmp; + + delete[] tmp; + + msgExt->setTopic(topic); + + // 17 properties + short propertiesLength; + memcpy(&propertiesLength, pData + 22 * sizeof(int) + bodyLen + 1 + topicLen, 2); + propertiesLength = ntohs(propertiesLength); + + if (propertiesLength > 0) + { + char* properties = new char[propertiesLength + 1]; + memcpy(properties, pData + 22 * sizeof(int) + bodyLen + 1 + topicLen + 2, propertiesLength); + properties[propertiesLength] = 0; + std::string propertiesString = properties; + std::map<std::string, std::string> map; + string2messageProperties(map, propertiesString); + msgExt->setProperties(map); + delete[] properties; + } + + offset = 22 * sizeof(int) + bodyLen + 1 + topicLen + 2 + propertiesLength; + + // ��ϢID + std::string msgId = createMessageId(storeAddr, physicOffset); + msgExt->setMsgId(msgId); + + return msgExt; + } + catch (...) + { + RMQ_ERROR("decode exception"); + if (msgExt) + { + delete msgExt; + msgExt = NULL; + } + } + + return NULL; +} + +std::list<MessageExt*> MessageDecoder::decodes(const char* pData, int len) +{ + return decodes(pData, len, true); +} + +std::list<MessageExt*> MessageDecoder::decodes(const char* pData, int len, bool readBody) +{ + std::list<MessageExt*> list; + + int offset = 0; + while (offset < len) + { + int tmp; + MessageExt* msg = decode(pData + offset, len, tmp); + list.push_back(msg); + offset += tmp; + } + + return list; +} + +std::string MessageDecoder::messageProperties2String(const std::map<std::string, std::string>& properties) +{ + std::stringstream ss; + + std::map<std::string, std::string>::const_iterator it = properties.begin(); + + for (; it != properties.end(); it++) + { + ss << it->first << NAME_VALUE_SEPARATOR << it->second << PROPERTY_SEPARATOR; + } + + return ss.str(); +} + +void MessageDecoder::string2messageProperties(std::map<std::string, std::string>& properties, + std::string& propertiesString) +{ + std::vector<std::string> out; + UtilAll::Split(out, propertiesString, PROPERTY_SEPARATOR); + + for (size_t i = 0; i < out.size(); i++) + { + std::vector<std::string> outValue; + UtilAll::Split(outValue, out[i], NAME_VALUE_SEPARATOR); + + if (outValue.size() == 2) + { + properties[outValue[0]] = outValue[1]; + } + } +} + +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/6a45c767/rocketmq-client4cpp/src/message/MessageDecoder.h ---------------------------------------------------------------------- diff --git a/rocketmq-client4cpp/src/message/MessageDecoder.h b/rocketmq-client4cpp/src/message/MessageDecoder.h new file mode 100755 index 0000000..a5f24ed --- /dev/null +++ b/rocketmq-client4cpp/src/message/MessageDecoder.h @@ -0,0 +1,64 @@ +/** +* Copyright (C) 2013 kangliqiang ,[email protected] +* +* Licensed under the Apache License, Version 2.0 (the "License"); +* you may not use this file except in compliance with the License. +* You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +*/ +#ifndef __MESSAGEDECODER_H__ +#define __MESSAGEDECODER_H__ + +#include <string> +#include <list> +#include <map> + +#include "SocketUtil.h" +#include "MessageId.h" + +namespace rmq +{ + class MessageExt; + class UnknownHostException; + + /** + * Message decoder + * + */ + class MessageDecoder + { + public: + static std::string createMessageId(sockaddr& addr, long long offset); + static MessageId decodeMessageId(const std::string& msgId); + + static MessageExt* decode(const char* pData, int len, int& offset); + static MessageExt* decode(const char* pData, int len, int& offset, bool readBody); + + static std::list<MessageExt*> decodes(const char* pData, int len); + static std::list<MessageExt*> decodes(const char* pData, int len, bool readBody); + + static std::string messageProperties2String(const std::map<std::string, std::string>& properties); + static void string2messageProperties(std::map<std::string, std::string>& properties, + std::string& propertiesString); + + public: + static const char NAME_VALUE_SEPARATOR; + static const char PROPERTY_SEPARATOR; + + static const int MSG_ID_LENGTH; + + static int MessageMagicCodePostion; + static int MessageFlagPostion; + static int MessagePhysicOffsetPostion; + static int MessageStoreTimestampPostion; + }; +} + +#endif http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/6a45c767/rocketmq-client4cpp/src/message/MessageExt.cpp ---------------------------------------------------------------------- diff --git a/rocketmq-client4cpp/src/message/MessageExt.cpp b/rocketmq-client4cpp/src/message/MessageExt.cpp new file mode 100755 index 0000000..35479ce --- /dev/null +++ b/rocketmq-client4cpp/src/message/MessageExt.cpp @@ -0,0 +1,244 @@ +/** +* Copyright (C) 2013 kangliqiang ,[email protected] +* +* Licensed under the Apache License, Version 2.0 (the "License"); +* you may not use this file except in compliance with the License. +* You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +*/ +#include "MessageExt.h" + +#include <sstream> +#include "MessageSysFlag.h" +#include "SocketUtil.h" + +namespace rmq +{ + +MessageExt::MessageExt() + : m_queueOffset(0), + m_commitLogOffset(0), + m_bornTimestamp(0), + m_storeTimestamp(0), + m_preparedTransactionOffset(0), + m_queueId(0), + m_storeSize(0), + m_sysFlag(0), + m_bodyCRC(0), + m_reconsumeTimes(3), + m_msgId("") +{ +} + +MessageExt::MessageExt(int queueId, + long long bornTimestamp, + sockaddr bornHost, + long long storeTimestamp, + sockaddr storeHost, + std::string msgId) + : m_queueOffset(0), + m_commitLogOffset(0), + m_bornTimestamp(bornTimestamp), + m_storeTimestamp(storeTimestamp), + m_preparedTransactionOffset(0), + m_queueId(queueId), + m_storeSize(0), + m_sysFlag(0), + m_bodyCRC(0), + m_reconsumeTimes(3), + m_bornHost(bornHost), + m_storeHost(storeHost), + m_msgId(msgId) +{ + +} + +MessageExt::~MessageExt() +{ + +} + +int MessageExt::getQueueId() +{ + return m_queueId; +} + +void MessageExt::setQueueId(int queueId) +{ + m_queueId = queueId; +} + +long long MessageExt::getBornTimestamp() +{ + return m_bornTimestamp; +} + +void MessageExt::setBornTimestamp(long long bornTimestamp) +{ + m_bornTimestamp = bornTimestamp; +} + +sockaddr MessageExt::getBornHost() +{ + return m_bornHost; +} + +std::string MessageExt::getBornHostString() +{ + return socketAddress2String(m_bornHost); +} + +std::string MessageExt::getBornHostNameString() +{ + return getHostName(m_bornHost); +} + +void MessageExt::setBornHost(const sockaddr& bornHost) +{ + m_bornHost = bornHost; +} + +long long MessageExt::getStoreTimestamp() +{ + return m_storeTimestamp; +} + +void MessageExt::setStoreTimestamp(long long storeTimestamp) +{ + m_storeTimestamp = storeTimestamp; +} + +sockaddr MessageExt::getStoreHost() +{ + return m_storeHost; +} + +std::string MessageExt::getStoreHostString() +{ + return socketAddress2String(m_storeHost); +} + +void MessageExt::setStoreHost(const sockaddr& storeHost) +{ + m_storeHost = storeHost; +} + +std::string MessageExt::getMsgId() +{ + return m_msgId; +} + +void MessageExt::setMsgId(const std::string& msgId) +{ + m_msgId = msgId; +} + +int MessageExt::getSysFlag() +{ + return m_sysFlag; +} + +void MessageExt::setSysFlag(int sysFlag) +{ + m_sysFlag = sysFlag; +} + +int MessageExt::getBodyCRC() +{ + return m_bodyCRC; +} + +void MessageExt::setBodyCRC(int bodyCRC) +{ + m_bodyCRC = bodyCRC; +} + +long long MessageExt::getQueueOffset() +{ + return m_queueOffset; +} + +void MessageExt::setQueueOffset(long long queueOffset) +{ + m_queueOffset = queueOffset; +} + +long long MessageExt::getCommitLogOffset() +{ + return m_commitLogOffset; +} + +void MessageExt::setCommitLogOffset(long long physicOffset) +{ + m_commitLogOffset = physicOffset; +} + +int MessageExt::getStoreSize() +{ + return m_storeSize; +} + +void MessageExt::setStoreSize(int storeSize) +{ + m_storeSize = storeSize; +} + +TopicFilterType MessageExt::parseTopicFilterType(int sysFlag) +{ + if ((sysFlag & MessageSysFlag::MultiTagsFlag) == MessageSysFlag::MultiTagsFlag) + { + return MULTI_TAG; + } + + return SINGLE_TAG; +} + +int MessageExt::getReconsumeTimes() +{ + return m_reconsumeTimes; +} + +void MessageExt::setReconsumeTimes(int reconsumeTimes) +{ + m_reconsumeTimes = reconsumeTimes; +} + +long long MessageExt::getPreparedTransactionOffset() +{ + return m_preparedTransactionOffset; +} + +void MessageExt::setPreparedTransactionOffset(long long preparedTransactionOffset) +{ + m_preparedTransactionOffset = preparedTransactionOffset; +} + +std::string MessageExt::toString() const +{ + std::stringstream ss; + ss << "{msgId=" << m_msgId + << ",queueId=" << m_queueId + << ",storeSize=" << m_storeSize + << ",sysFlag=" << m_sysFlag + << ",queueOffset=" << m_queueOffset + << ",commitLogOffset=" << m_commitLogOffset + << ",preparedTransactionOffset=" << m_preparedTransactionOffset + << ",bornTimestamp=" << m_bornTimestamp + << ",bornHost=" << socketAddress2String(m_bornHost) + << ",storeHost=" << socketAddress2String(m_storeHost) + << ",storeTimestamp=" << m_storeTimestamp + << ",reconsumeTimes=" << m_reconsumeTimes + << ",bodyCRC=" << m_bodyCRC + << ",Message=" << Message::toString() + << "}"; + return ss.str(); +} + +} http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/6a45c767/rocketmq-client4cpp/src/message/MessageId.h ---------------------------------------------------------------------- diff --git a/rocketmq-client4cpp/src/message/MessageId.h b/rocketmq-client4cpp/src/message/MessageId.h new file mode 100644 index 0000000..5237f8d --- /dev/null +++ b/rocketmq-client4cpp/src/message/MessageId.h @@ -0,0 +1,59 @@ +/** +* Copyright (C) 2013 kangliqiang ,[email protected] +* +* Licensed under the Apache License, Version 2.0 (the "License"); +* you may not use this file except in compliance with the License. +* You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +*/ + +#ifndef __MESSAGEID_H__ +#define __MESSAGEID_H__ + +#include "SocketUtil.h" + +namespace rmq +{ + class MessageId + { + public: + MessageId(sockaddr address, long long offset) + : m_address(address), m_offset(offset) + { + + } + + sockaddr getAddress() + { + return m_address; + } + + void setAddress(sockaddr address) + { + m_address = address; + } + + long long getOffset() + { + return m_offset; + } + + void setOffset(long long offset) + { + m_offset = offset; + } + + private: + sockaddr m_address; + long long m_offset; + }; +} + +#endif
