http://git-wip-us.apache.org/repos/asf/zookeeper/blob/eab8c1eb/zookeeper-contrib/zookeeper-contrib-zkfuse/src/event.h ---------------------------------------------------------------------- diff --git a/zookeeper-contrib/zookeeper-contrib-zkfuse/src/event.h b/zookeeper-contrib/zookeeper-contrib-zkfuse/src/event.h new file mode 100644 index 0000000..936ecc6 --- /dev/null +++ b/zookeeper-contrib/zookeeper-contrib-zkfuse/src/event.h @@ -0,0 +1,553 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#ifndef __EVENT_H__ +#define __EVENT_H__ + +#include <string> +#include <set> +#include <deque> +#include <algorithm> +#ifdef GCC4 +# include <tr1/memory> +using namespace std::tr1; +#else +# include <boost/shared_ptr.hpp> +using namespace boost; +#endif + +#include "log.h" +#include "blockingqueue.h" +#include "mutex.h" +#include "thread.h" + +using namespace std; +using namespace zk; + +namespace zkfuse { + +//forward declaration of EventSource +template<typename E> +class EventSource; + +/** + * \brief This interface is implemented by an observer + * \brief of a particular {@link EventSource}. + */ +template<typename E> +class EventListener { + public: + + /** + * \brief This method is invoked whenever an event + * \brief has been received by the event source being observed. + * + * @param source the source the triggered the event + * @param e the actual event being triggered + */ + virtual void eventReceived(const EventSource<E> &source, const E &e) = 0; +}; + +/** + * \brief This class represents a source of events. + * + * <p> + * Each source can have many observers (listeners) attached to it + * and in case of an event, this source may propagate the event + * using {@link #fireEvent} method. + */ +template<typename E> +class EventSource { + public: + + /** + * \brief The type corresponding to the list of registered event listeners. + */ + typedef set<EventListener<E> *> EventListeners; + + /** + * \brief Registers a new event listener. + * + * @param listener the listener to be added to the set of listeners + */ + void addListener(EventListener<E> *listener) { + m_listeners.insert( listener ); + } + + /** + * \brief Removes an already registered listener. + * + * @param listener the listener to be removed + */ + void removeListener(EventListener<E> *listener) { + m_listeners.erase( listener ); + } + + /** + * \brief Destructor. + */ + virtual ~EventSource() {} + + protected: + + /** + * \brief Fires the given event to all registered listeners. + * + * <p> + * This method essentially iterates over all listeners + * and invokes {@link fireEvent(EventListener<E> *listener, const E &event)} + * for each element. All derived classes are free to + * override the method to provide better error handling + * than the default implementation. + * + * @param event the event to be propagated to all listeners + */ + void fireEvent(const E &event); + + /** + * \brief Sends an event to the given listener. + * + * @param listener the listener to whom pass the event + * @param event the event to be handled + */ + virtual void fireEvent(EventListener<E> *listener, const E &event); + + private: + + /** + * The set of registered event listeners. + */ + EventListeners m_listeners; + +}; + +/** + * \brief The interface of a generic event wrapper. + */ +class AbstractEventWrapper { + public: + + /** + * \brief Destructor. + */ + virtual ~AbstractEventWrapper() {} + + /** + * \brief Returns the underlying wrapee's data. + */ + virtual void *getWrapee() = 0; +}; + +/** + * \brief A template based implementation of {@link AbstractEventWrapper}. + */ +template<typename E> +class EventWrapper : public AbstractEventWrapper { + public: + EventWrapper(const E &e) : m_e(e) { + } + void *getWrapee() { + return &m_e; + } + private: + E m_e; +}; + +/** + * \brief This class represents a generic event. + */ +class GenericEvent { + public: + + /** + * \brief Constructor. + */ + GenericEvent() : m_type(0) {} + + /** + * \brief Constructor. + * + * @param type the type of this event + * @param eventWarpper the wrapper around event's data + */ + GenericEvent(int type, AbstractEventWrapper *eventWrapper) : + m_type(type), m_eventWrapper(eventWrapper) { + } + + /** + * \brief Returns the type of this event. + * + * @return type of this event + */ + int getType() const { return m_type; } + + /** + * \brief Returns the event's data. + * + * @return the event's data + */ + void *getEvent() const { return m_eventWrapper->getWrapee(); } + + private: + + /** + * The event type. + */ + int m_type; + + /** + * The event represented as abstract wrapper. + */ + boost::shared_ptr<AbstractEventWrapper> m_eventWrapper; + +}; + +/** + * \brief This class adapts {@link EventListener} to a generic listener. + * Essentially this class listens on incoming events and fires them + * as {@link GenericEvent}s. + */ +template<typename E, const int type> +class EventListenerAdapter : public virtual EventListener<E>, + public virtual EventSource<GenericEvent> +{ + public: + + /** + * \brief Constructor. + * + * @param eventSource the source on which register this listener + */ + EventListenerAdapter(EventSource<E> &eventSource) { + eventSource.addListener(this); + } + + void eventReceived(const EventSource<E> &source, const E &e) { + AbstractEventWrapper *wrapper = new EventWrapper<E>(e); + GenericEvent event(type, wrapper); + fireEvent( event ); + } + +}; + +/** + * \brief This class provides an adapter between an asynchronous and synchronous + * \brief event handling. + * + * <p> + * This class queues up all received events and exposes them through + * {@link #getNextEvent()} method. + */ +template<typename E> +class SynchronousEventAdapter : public EventListener<E> { + public: + + void eventReceived(const EventSource<E> &source, const E &e) { + m_queue.put( e ); + } + + /** + * \brief Returns the next available event from the underlying queue, + * \brief possibly blocking, if no data is available. + * + * @return the next available event + */ + E getNextEvent() { + return m_queue.take(); + } + + /** + * \brief Returns whether there are any events in the queue or not. + * + * @return true if there is at least one event and + * the next call to {@link #getNextEvent} won't block + */ + bool hasEvents() const { + return (m_queue.empty() ? false : true); + } + + /** + * \brief Destructor. + */ + virtual ~SynchronousEventAdapter() {} + + private: + + /** + * The blocking queue of all events received so far. + */ + BlockingQueue<E> m_queue; + +}; + +/** + * This typedef defines the type of a timer Id. + */ +typedef int32_t TimerId; + +/** + * This class represents a timer event parametrized by the user's data type. + */ +template<typename T> +class TimerEvent { + public: + + /** + * \brief Constructor. + * + * @param id the ID of this event + * @param alarmTime when this event is to be triggered + * @param userData the user data associated with this event + */ + TimerEvent(TimerId id, int64_t alarmTime, const T &userData) : + m_id(id), m_alarmTime(alarmTime), m_userData(userData) + {} + + /** + * \brief Constructor. + */ + TimerEvent() : m_id(-1), m_alarmTime(-1) {} + + /** + * \brief Returns the ID. + * + * @return the ID of this event + */ + TimerId getID() const { return m_id; } + + /** + * \brief Returns the alarm time. + * + * @return the alarm time + */ + int64_t getAlarmTime() const { return m_alarmTime; } + + /** + * \brief Returns the user's data. + * + * @return the user's data + */ + T const &getUserData() const { return m_userData; } + + /** + * \brief Returns whether the given alarm time is less than this event's + * \brief time. + */ + bool operator<(const int64_t alarmTime) const { + return m_alarmTime < alarmTime; + } + + private: + + /** + * The ID of ths event. + */ + TimerId m_id; + + /** + * The time at which this event triggers. + */ + int64_t m_alarmTime; + + /** + * The user specific data associated with this event. + */ + T m_userData; + +}; + +template<typename T> +class Timer : public EventSource<TimerEvent<T> > { + public: + + /** + * \brief Constructor. + */ + Timer() : m_currentEventID(0), m_terminating(false) { + m_workerThread.Create( *this, &Timer<T>::sendAlarms ); + } + + /** + * \brief Destructor. + */ + ~Timer() { + m_terminating = true; + m_lock.notify(); + m_workerThread.Join(); + } + + /** + * \brief Schedules the given event <code>timeFromNow</code> milliseconds. + * + * @param timeFromNow time from now, in milliseconds, when the event + * should be triggered + * @param userData the user data associated with the timer event + * + * @return the ID of the newly created timer event + */ + TimerId scheduleAfter(int64_t timeFromNow, const T &userData) { + return scheduleAt( getCurrentTimeMillis() + timeFromNow, userData ); + } + + /** + * \brief Schedules an event at the given time. + * + * @param absTime absolute time, in milliseconds, at which the event + * should be triggered; the time is measured + * from Jan 1st, 1970 + * @param userData the user data associated with the timer event + * + * @return the ID of the newly created timer event + */ + TimerId scheduleAt(int64_t absTime, const T &userData) { + m_lock.lock(); + typename QueueType::iterator pos = + lower_bound( m_queue.begin(), m_queue.end(), absTime ); + TimerId id = m_currentEventID++; + TimerEvent<T> event(id, absTime, userData); + m_queue.insert( pos, event ); + m_lock.notify(); + m_lock.unlock(); + return id; + } + + /** + * \brief Returns the current time since Jan 1, 1970, in milliseconds. + * + * @return the current time in milliseconds + */ + static int64_t getCurrentTimeMillis() { + struct timeval now; + gettimeofday( &now, NULL ); + return now.tv_sec * 1000LL + now.tv_usec / 1000; + } + + /** + * \brief Cancels the given timer event. + * + * + * @param eventID the ID of the event to be canceled + * + * @return whether the event has been canceled + */ + bool cancelAlarm(TimerId eventID) { + bool canceled = false; + m_lock.lock(); + typename QueueType::iterator i; + for (i = m_queue.begin(); i != m_queue.end(); ++i) { + if (eventID == i->getID()) { + m_queue.erase( i ); + canceled = true; + break; + } + } + m_lock.unlock(); + return canceled; + } + + /** + * Executes the main loop of the worker thread. + */ + void sendAlarms() { + //iterate until terminating + while (!m_terminating) { + m_lock.lock(); + //1 step - wait until there is an event in the queue + if (m_queue.empty()) { + //wait up to 100ms to get next event + m_lock.wait( 100 ); + } + bool fire = false; + if (!m_queue.empty()) { + //retrieve the event from the queue and send it + TimerEvent<T> event = m_queue.front(); + //check whether we can send it right away + int64_t timeToWait = + event.getAlarmTime() - getCurrentTimeMillis(); + if (timeToWait <= 0) { + m_queue.pop_front(); + //we fire only if it's still in the queue and alarm + //time has just elapsed (in case the top event + //is canceled) + fire = true; + } else { + m_lock.wait( timeToWait ); + } + m_lock.unlock(); + if (fire) { + fireEvent( event ); + } + } else { + m_lock.unlock(); + } + } + } + + private: + + /** + * The type of timer events queue. + */ + typedef deque<TimerEvent<T> > QueueType; + + /** + * The current event ID, auto-incremented each time a new event + * is created. + */ + TimerId m_currentEventID; + + /** + * The queue of timer events sorted by {@link TimerEvent#alarmTime}. + */ + QueueType m_queue; + + /** + * The lock used to guard {@link #m_queue}. + */ + Lock m_lock; + + /** + * The thread that triggers alarms. + */ + CXXThread<Timer<T> > m_workerThread; + + /** + * Whether {@link #m_workerThread} is terminating. + */ + volatile bool m_terminating; + +}; + +template<typename E> +void EventSource<E>::fireEvent(const E &event) { + for (typename EventListeners::iterator i = m_listeners.begin(); + i != m_listeners.end(); + ++i) + { + fireEvent( *i, event ); + } +} + +template<typename E> +void EventSource<E>::fireEvent(EventListener<E> *listener, const E &event) { + listener->eventReceived( *this, event ); +} + +} /* end of 'namespace zkfuse' */ + +#endif /* __EVENT_H__ */
http://git-wip-us.apache.org/repos/asf/zookeeper/blob/eab8c1eb/zookeeper-contrib/zookeeper-contrib-zkfuse/src/log.cc ---------------------------------------------------------------------- diff --git a/zookeeper-contrib/zookeeper-contrib-zkfuse/src/log.cc b/zookeeper-contrib/zookeeper-contrib-zkfuse/src/log.cc new file mode 100644 index 0000000..e2bfb0d --- /dev/null +++ b/zookeeper-contrib/zookeeper-contrib-zkfuse/src/log.cc @@ -0,0 +1,36 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include <string> + +#include "log.h" + +using namespace std; + +/** + * \brief This class encapsulates a log4cxx configuration. + */ +class LogConfiguration { + public: + LogConfiguration(const string &file) { + PropertyConfigurator::configureAndWatch( file, 5000 ); + } +}; + +//enforces the configuration to be initialized +static LogConfiguration logConfig( "log4cxx.properties" ); http://git-wip-us.apache.org/repos/asf/zookeeper/blob/eab8c1eb/zookeeper-contrib/zookeeper-contrib-zkfuse/src/log.h ---------------------------------------------------------------------- diff --git a/zookeeper-contrib/zookeeper-contrib-zkfuse/src/log.h b/zookeeper-contrib/zookeeper-contrib-zkfuse/src/log.h new file mode 100644 index 0000000..aefce10 --- /dev/null +++ b/zookeeper-contrib/zookeeper-contrib-zkfuse/src/log.h @@ -0,0 +1,116 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#ifndef __LOG_H__ +#define __LOG_H__ + +#define ZKFUSE_NAMESPACE zkfuse +#define START_ZKFUSE_NAMESPACE namespace ZKFUSE_NAMESPACE { +#define END_ZKFUSE_NAMESPACE } +#define USING_ZKFUSE_NAMESPACE using namespace ZKFUSE_NAMESPACE; + +#include <stdlib.h> +#include <stdio.h> +#include <pthread.h> + +#include <log4cxx/logger.h> +#include <log4cxx/propertyconfigurator.h> +#include <log4cxx/helpers/exception.h> +using namespace log4cxx; +using namespace log4cxx::helpers; + +#define PRINTIP(x) ((uint8_t*)&x)[0], ((uint8_t*)&x)[1], \ + ((uint8_t*)&x)[2], ((uint8_t*)&x)[3] + +#define IPFMT "%u.%u.%u.%u" + +#define DECLARE_LOGGER(varName) \ +extern LoggerPtr varName; + +#define DEFINE_LOGGER(varName, logName) \ +static LoggerPtr varName = Logger::getLogger( logName ); + +#define MAX_BUFFER_SIZE 20000 + +#define SPRINTF_LOG_MSG(buffer, fmt, args...) \ + char buffer[MAX_BUFFER_SIZE]; \ + snprintf( buffer, MAX_BUFFER_SIZE, fmt, ##args ); + +// older versions of log4cxx don't support tracing +#ifdef LOG4CXX_TRACE +#define LOG_TRACE(logger, fmt, args...) \ + if (logger->isTraceEnabled()) { \ + SPRINTF_LOG_MSG( __tmp, fmt, ##args ); \ + LOG4CXX_TRACE( logger, __tmp ); \ + } +#else +#define LOG_TRACE(logger, fmt, args...) \ + if (logger->isDebugEnabled()) { \ + SPRINTF_LOG_MSG( __tmp, fmt, ##args ); \ + LOG4CXX_DEBUG( logger, __tmp ); \ + } +#endif + +#define LOG_DEBUG(logger, fmt, args...) \ + if (logger->isDebugEnabled()) { \ + SPRINTF_LOG_MSG( __tmp, fmt, ##args ); \ + LOG4CXX_DEBUG( logger, __tmp ); \ + } + +#define LOG_INFO(logger, fmt, args...) \ + if (logger->isInfoEnabled()) { \ + SPRINTF_LOG_MSG( __tmp, fmt, ##args ); \ + LOG4CXX_INFO( logger, __tmp ); \ + } + +#define LOG_WARN(logger, fmt, args...) \ + if (logger->isWarnEnabled()) { \ + SPRINTF_LOG_MSG( __tmp, fmt, ##args ); \ + LOG4CXX_WARN( logger, __tmp ); \ + } + +#define LOG_ERROR(logger, fmt, args...) \ + if (logger->isErrorEnabled()) { \ + SPRINTF_LOG_MSG( __tmp, fmt, ##args ); \ + LOG4CXX_ERROR( logger, __tmp ); \ + } + +#define LOG_FATAL(logger, fmt, args...) \ + if (logger->isFatalEnabled()) { \ + SPRINTF_LOG_MSG( __tmp, fmt, ##args ); \ + LOG4CXX_FATAL( logger, __tmp ); \ + } + +#ifdef DISABLE_TRACE +# define TRACE(logger, x) +#else +# define TRACE(logger, x) \ +class Trace { \ + public: \ + Trace(const void* p) : _p(p) { \ + LOG_TRACE(logger, "%s %p Enter", __PRETTY_FUNCTION__, p); \ + } \ + ~Trace() { \ + LOG_TRACE(logger, "%s %p Exit", __PRETTY_FUNCTION__, _p); \ + } \ + const void* _p; \ +} traceObj(x); +#endif /* DISABLE_TRACE */ + +#endif /* __LOG_H__ */ + http://git-wip-us.apache.org/repos/asf/zookeeper/blob/eab8c1eb/zookeeper-contrib/zookeeper-contrib-zkfuse/src/log4cxx.properties ---------------------------------------------------------------------- diff --git a/zookeeper-contrib/zookeeper-contrib-zkfuse/src/log4cxx.properties b/zookeeper-contrib/zookeeper-contrib-zkfuse/src/log4cxx.properties new file mode 100644 index 0000000..1e373e4 --- /dev/null +++ b/zookeeper-contrib/zookeeper-contrib-zkfuse/src/log4cxx.properties @@ -0,0 +1,28 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at + +# http://www.apache.org/licenses/LICENSE-2.0 + +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +# Set root logger level to DEBUG and its only appender to A1. +log4j.rootLogger=TRACE, A1 + +# A1 is set to be a ConsoleAppender. +log4j.appender.A1=org.apache.log4cxx.ConsoleAppender + +# A1 uses PatternLayout. +log4j.appender.A1.layout=org.apache.log4cxx.PatternLayout +log4j.appender.A1.layout.ConversionPattern=%-4r [%t] %-5p %c %x - %m%n + +log4j.category.zkfuse=TRACE + http://git-wip-us.apache.org/repos/asf/zookeeper/blob/eab8c1eb/zookeeper-contrib/zookeeper-contrib-zkfuse/src/mutex.h ---------------------------------------------------------------------- diff --git a/zookeeper-contrib/zookeeper-contrib-zkfuse/src/mutex.h b/zookeeper-contrib/zookeeper-contrib-zkfuse/src/mutex.h new file mode 100644 index 0000000..86c4604 --- /dev/null +++ b/zookeeper-contrib/zookeeper-contrib-zkfuse/src/mutex.h @@ -0,0 +1,169 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#ifndef __MUTEX_H__ +#define __MUTEX_H__ + +#include <pthread.h> +#include <errno.h> +#include <sys/time.h> + +#include "log.h" + +START_ZKFUSE_NAMESPACE + +class Cond; + +class Mutex { + friend class Cond; + public: + Mutex() { + pthread_mutexattr_init( &m_mutexAttr ); + pthread_mutexattr_settype( &m_mutexAttr, PTHREAD_MUTEX_RECURSIVE_NP ); + pthread_mutex_init( &mutex, &m_mutexAttr ); + } + ~Mutex() { + pthread_mutex_destroy(&mutex); + pthread_mutexattr_destroy( &m_mutexAttr ); + } + void Acquire() { Lock(); } + void Release() { Unlock(); } + void Lock() { + pthread_mutex_lock(&mutex); + } + int TryLock() { + return pthread_mutex_trylock(&mutex); + } + void Unlock() { + pthread_mutex_unlock(&mutex); + } + private: + pthread_mutex_t mutex; + pthread_mutexattr_t m_mutexAttr; +}; + +class AutoLock { + public: + AutoLock(Mutex& mutex) : _mutex(mutex) { + mutex.Lock(); + } + ~AutoLock() { + _mutex.Unlock(); + } + private: + friend class AutoUnlockTemp; + Mutex& _mutex; +}; + +class AutoUnlockTemp { + public: + AutoUnlockTemp(AutoLock & autoLock) : _autoLock(autoLock) { + _autoLock._mutex.Unlock(); + } + ~AutoUnlockTemp() { + _autoLock._mutex.Lock(); + } + private: + AutoLock & _autoLock; +}; + +class Cond { + public: + Cond() { + static pthread_condattr_t attr; + static bool inited = false; + if(!inited) { + inited = true; + pthread_condattr_init(&attr); + } + pthread_cond_init(&_cond, &attr); + } + ~Cond() { + pthread_cond_destroy(&_cond); + } + + void Wait(Mutex& mutex) { + pthread_cond_wait(&_cond, &mutex.mutex); + } + + bool Wait(Mutex& mutex, long long int timeout) { + struct timeval now; + gettimeofday( &now, NULL ); + struct timespec abstime; + int64_t microSecs = now.tv_sec * 1000000LL + now.tv_usec; + microSecs += timeout * 1000; + abstime.tv_sec = microSecs / 1000000LL; + abstime.tv_nsec = (microSecs % 1000000LL) * 1000; + if (pthread_cond_timedwait(&_cond, &mutex.mutex, &abstime) == ETIMEDOUT) { + return false; + } else { + return true; + } + } + + void Signal() { + pthread_cond_signal(&_cond); + } + + private: + pthread_cond_t _cond; +}; + +/** + * A wrapper class for {@link Mutex} and {@link Cond}. + */ +class Lock { + public: + + void lock() { + m_mutex.Lock(); + } + + void unlock() { + m_mutex.Unlock(); + } + + void wait() { + m_cond.Wait( m_mutex ); + } + + bool wait(long long int timeout) { + return m_cond.Wait( m_mutex, timeout ); + } + + void notify() { + m_cond.Signal(); + } + + private: + + /** + * The mutex. + */ + Mutex m_mutex; + + /** + * The condition associated with this lock's mutex. + */ + Cond m_cond; +}; + +END_ZKFUSE_NAMESPACE + +#endif /* __MUTEX_H__ */ + http://git-wip-us.apache.org/repos/asf/zookeeper/blob/eab8c1eb/zookeeper-contrib/zookeeper-contrib-zkfuse/src/thread.cc ---------------------------------------------------------------------- diff --git a/zookeeper-contrib/zookeeper-contrib-zkfuse/src/thread.cc b/zookeeper-contrib/zookeeper-contrib-zkfuse/src/thread.cc new file mode 100644 index 0000000..f1ed816 --- /dev/null +++ b/zookeeper-contrib/zookeeper-contrib-zkfuse/src/thread.cc @@ -0,0 +1,41 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include <log.h> + +#include "thread.h" + +DEFINE_LOGGER( LOG, "Thread" ) + +START_ZKFUSE_NAMESPACE + +void Thread::Create(void* ctx, ThreadFunc func) +{ + pthread_attr_t attr; + pthread_attr_init(&attr); + pthread_attr_setstacksize(&attr, _stackSize); + int ret = pthread_create(&mThread, &attr, func, ctx); + if(ret != 0) { + LOG_FATAL( LOG, "pthread_create failed: %s", strerror(errno) ); + } + // pthread_attr_destroy(&attr); + _ctx = ctx; + _func = func; +} + +END_ZKFUSE_NAMESPACE http://git-wip-us.apache.org/repos/asf/zookeeper/blob/eab8c1eb/zookeeper-contrib/zookeeper-contrib-zkfuse/src/thread.h ---------------------------------------------------------------------- diff --git a/zookeeper-contrib/zookeeper-contrib-zkfuse/src/thread.h b/zookeeper-contrib/zookeeper-contrib-zkfuse/src/thread.h new file mode 100644 index 0000000..0ed12d7 --- /dev/null +++ b/zookeeper-contrib/zookeeper-contrib-zkfuse/src/thread.h @@ -0,0 +1,99 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#ifndef __THREAD_H__ +#define __THREAD_H__ + +#include <errno.h> +#include <string.h> +#include <assert.h> +#include <pthread.h> + +#include "log.h" + +START_ZKFUSE_NAMESPACE + +class Thread { + public: + static const size_t defaultStackSize = 1024 * 1024; + typedef void* (*ThreadFunc) (void*); + Thread(size_t stackSize = defaultStackSize) + : _stackSize(stackSize), _ctx(NULL), _func(NULL) + { + memset( &mThread, 0, sizeof(mThread) ); + } + ~Thread() { } + + void Create(void* ctx, ThreadFunc func); + void Join() { + //avoid SEGFAULT because of unitialized mThread + //in case Create(...) was never called + if (_func != NULL) { + pthread_join(mThread, 0); + } + } + private: + pthread_t mThread; + void *_ctx; + ThreadFunc _func; + size_t _stackSize; +}; + + +template<typename T> +struct ThreadContext { + typedef void (T::*FuncPtr) (void); + ThreadContext(T& ctx, FuncPtr func) : _ctx(ctx), _func(func) {} + void run(void) { + (_ctx.*_func)(); + } + T& _ctx; + FuncPtr _func; +}; + +template<typename T> +void* ThreadExec(void *obj) { + ThreadContext<T>* tc = (ThreadContext<T>*)(obj); + assert(tc != 0); + tc->run(); + return 0; +} + +template <typename T> +class CXXThread : public Thread { + public: + typedef void (T::*FuncPtr) (void); + CXXThread(size_t stackSize = Thread::defaultStackSize) + : Thread(stackSize), ctx(0) {} + ~CXXThread() { if (ctx) delete ctx; } + + void Create(T& obj, FuncPtr func) { + assert(ctx == 0); + ctx = new ThreadContext<T>(obj, func); + Thread::Create(ctx, ThreadExec<T>); + } + + private: + ThreadContext<T>* ctx; +}; + + +END_ZKFUSE_NAMESPACE + +#endif /* __THREAD_H__ */ + http://git-wip-us.apache.org/repos/asf/zookeeper/blob/eab8c1eb/zookeeper-contrib/zookeeper-contrib-zkfuse/src/zkadapter.cc ---------------------------------------------------------------------- diff --git a/zookeeper-contrib/zookeeper-contrib-zkfuse/src/zkadapter.cc b/zookeeper-contrib/zookeeper-contrib-zkfuse/src/zkadapter.cc new file mode 100644 index 0000000..7f02fa3 --- /dev/null +++ b/zookeeper-contrib/zookeeper-contrib-zkfuse/src/zkadapter.cc @@ -0,0 +1,884 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include <algorithm> +#include <iostream> + +#include "blockingqueue.h" +#include "thread.h" +#include "zkadapter.h" + +using namespace std; +using namespace zk; + +DEFINE_LOGGER( LOG, "zookeeper.adapter" ) +DEFINE_LOGGER( ZK_LOG, "zookeeper.core" ) + +/** + * \brief A helper class to initialize ZK logging. + */ +class InitZooKeeperLogging +{ + public: + InitZooKeeperLogging() { + if (ZK_LOG->isDebugEnabled() +#ifdef LOG4CXX_TRACE + || ZK_LOG->isTraceEnabled() +#endif + ) + { + zoo_set_debug_level( ZOO_LOG_LEVEL_DEBUG ); + } else if (ZK_LOG->isInfoEnabled()) { + zoo_set_debug_level( ZOO_LOG_LEVEL_INFO ); + } else if (ZK_LOG->isWarnEnabled()) { + zoo_set_debug_level( ZOO_LOG_LEVEL_WARN ); + } else { + zoo_set_debug_level( ZOO_LOG_LEVEL_ERROR ); + } + } +}; + +using namespace std; + +namespace zk +{ + +/** + * \brief This class provides logic for checking if a request can be retried. + */ +class RetryHandler +{ + public: + RetryHandler(const ZooKeeperConfig &zkConfig) + : m_zkConfig(zkConfig) + { + if (zkConfig.getAutoReconnect()) { + retries = 2; + } else { + retries = 0; + } + } + + /** + * \brief Attempts to fix a side effect of the given RC. + * + * @param rc the ZK error code + * @return whether the error code has been handled and the caller should + * retry an operation the caused this error + */ + bool handleRC(int rc) + { + TRACE( LOG, "handleRC" ); + + //check if the given error code is recoverable + if (!retryOnError(rc)) { + return false; + } + LOG_TRACE( LOG, "RC: %d, retries left: %d", rc, retries ); + if (retries-- > 0) { + return true; + } else { + return false; + } + } + + private: + /** + * The ZK config. + */ + const ZooKeeperConfig &m_zkConfig; + + /** + * The number of outstanding retries. + */ + int retries; + + /** + * Checks whether the given error entitles this adapter + * to retry the previous operation. + * + * @param zkErrorCode one of the ZK error code + */ + static bool retryOnError(int zkErrorCode) + { + return (zkErrorCode == ZCONNECTIONLOSS || + zkErrorCode == ZOPERATIONTIMEOUT); + } +}; + + +//the implementation of the global ZK event watcher +void zkWatcher(zhandle_t *zh, int type, int state, const char *path, + void *watcherCtx) +{ + TRACE( LOG, "zkWatcher" ); + + //a workaround for buggy ZK API + string sPath = + (path == NULL || + state == ZOO_SESSION_EVENT || + state == ZOO_NOTWATCHING_EVENT) + ? "" + : string(path); + LOG_INFO( LOG, + "Received a ZK event - type: %d, state: %d, path: '%s'", + type, state, sPath.c_str() ); + ZooKeeperAdapter *zka = (ZooKeeperAdapter *)zoo_get_context(zh); + if (zka != NULL) { + zka->enqueueEvent( type, state, sPath ); + } else { + LOG_ERROR( LOG, + "Skipping ZK event (type: %d, state: %d, path: '%s'), " + "because ZK passed no context", + type, state, sPath.c_str() ); + } +} + + + +// ======================================================================= + +ZooKeeperAdapter::ZooKeeperAdapter(ZooKeeperConfig config, + ZKEventListener *listener, + bool establishConnection) + throw(ZooKeeperException) + : m_zkConfig(config), + mp_zkHandle(NULL), + m_terminating(false), + m_connected(false), + m_state(AS_DISCONNECTED) +{ + TRACE( LOG, "ZooKeeperAdapter" ); + + resetRemainingConnectTimeout(); + + //enforce setting up appropriate ZK log level + static InitZooKeeperLogging INIT_ZK_LOGGING; + + if (listener != NULL) { + addListener(listener); + } + + //start the event dispatcher thread + m_eventDispatcher.Create( *this, &ZooKeeperAdapter::processEvents ); + + //start the user event dispatcher thread + m_userEventDispatcher.Create( *this, &ZooKeeperAdapter::processUserEvents ); + + //optionally establish the connection + if (establishConnection) { + reconnect(); + } +} + +ZooKeeperAdapter::~ZooKeeperAdapter() +{ + TRACE( LOG, "~ZooKeeperAdapter" ); + + try { + disconnect(); + } catch (std::exception &e) { + LOG_ERROR( LOG, + "An exception while disconnecting from ZK: %s", + e.what() ); + } + m_terminating = true; + m_userEventDispatcher.Join(); + m_eventDispatcher.Join(); +} + +void +ZooKeeperAdapter::validatePath(const string &path) throw(ZooKeeperException) +{ + TRACE( LOG, "validatePath" ); + + if (path.find( "/" ) != 0) { + throw ZooKeeperException( string("Node path must start with '/' but" + "it was '") + + path + + "'" ); + } + if (path.length() > 1) { + if (path.rfind( "/" ) == path.length() - 1) { + throw ZooKeeperException( string("Node path must not end with " + "'/' but it was '") + + path + + "'" ); + } + if (path.find( "//" ) != string::npos) { + throw ZooKeeperException( string("Node path must not contain " + "'//' but it was '") + + path + + "'" ); + } + } +} + +void +ZooKeeperAdapter::disconnect() +{ + TRACE( LOG, "disconnect" ); + LOG_TRACE( LOG, "mp_zkHandle: %p, state %d", mp_zkHandle, m_state ); + + m_stateLock.lock(); + if (mp_zkHandle != NULL) { + zookeeper_close( mp_zkHandle ); + mp_zkHandle = NULL; + setState( AS_DISCONNECTED ); + } + m_stateLock.unlock(); +} + +void +ZooKeeperAdapter::reconnect() throw(ZooKeeperException) +{ + TRACE( LOG, "reconnect" ); + + m_stateLock.lock(); + //clear the connection state + disconnect(); + + //establish a new connection to ZooKeeper + mp_zkHandle = zookeeper_init( m_zkConfig.getHosts().c_str(), + zkWatcher, + m_zkConfig.getLeaseTimeout(), + NULL, this, 0); + resetRemainingConnectTimeout(); + if (mp_zkHandle != NULL) { + setState( AS_CONNECTING ); + m_stateLock.unlock(); + } else { + m_stateLock.unlock(); + throw ZooKeeperException( + string("Unable to connect to ZK running at '") + + m_zkConfig.getHosts() + "'" ); + } + + LOG_DEBUG( LOG, "mp_zkHandle: %p, state %d", mp_zkHandle, m_state ); +} + +void +ZooKeeperAdapter::handleEvent(int type, int state, const string &path) +{ + TRACE( LOG, "handleEvent" ); + LOG_TRACE( LOG, + "type: %d, state %d, path: %s", + type, state, path.c_str() ); + Listener2Context context, context2; + //ignore internal ZK events + if (type != ZOO_SESSION_EVENT && type != ZOO_NOTWATCHING_EVENT) { + m_zkContextsMutex.Acquire(); + //check if the user context is available + if (type == ZOO_CHANGED_EVENT || type == ZOO_DELETED_EVENT) { + //we may have two types of interest here, + //in this case lets try to notify twice + context = findAndRemoveListenerContext( GET_NODE_DATA, path ); + context2 = findAndRemoveListenerContext( NODE_EXISTS, path ); + if (context.empty()) { + //make sure that the 2nd context is NULL and + // assign it to the 1st one + context = context2; + context2.clear(); + } + } else if (type == ZOO_CHILD_EVENT) { + context = findAndRemoveListenerContext( GET_NODE_CHILDREN, path ); + } else if (type == ZOO_CREATED_EVENT) { + context = findAndRemoveListenerContext( NODE_EXISTS, path ); + } + m_zkContextsMutex.Release(); + } + + handleEvent( type, state, path, context ); + if (!context2.empty()) { + handleEvent( type, state, path, context2 ); + } +} + +void +ZooKeeperAdapter::handleEvent(int type, + int state, + const string &path, + const Listener2Context &listeners) +{ + TRACE( LOG, "handleEvents" ); + + if (listeners.empty()) { + //propagate with empty context + ZKWatcherEvent event(type, state, path); + fireEvent( event ); + } else { + for (Listener2Context::const_iterator i = listeners.begin(); + i != listeners.end(); + ++i) { + ZKWatcherEvent event(type, state, path, i->second); + if (i->first != NULL) { + fireEvent( i->first, event ); + } else { + fireEvent( event ); + } + } + } +} + +void +ZooKeeperAdapter::enqueueEvent(int type, int state, const string &path) +{ + TRACE( LOG, "enqueueEvents" ); + + m_events.put( ZKWatcherEvent( type, state, path ) ); +} + +void +ZooKeeperAdapter::processEvents() +{ + TRACE( LOG, "processEvents" ); + + while (!m_terminating) { + bool timedOut = false; + ZKWatcherEvent source = m_events.take( 100, &timedOut ); + if (!timedOut) { + if (source.getType() == ZOO_SESSION_EVENT) { + LOG_INFO( LOG, + "Received SESSION event, state: %d. Adapter state: %d", + source.getState(), m_state ); + m_stateLock.lock(); + if (source.getState() == ZOO_CONNECTED_STATE) { + m_connected = true; + resetRemainingConnectTimeout(); + setState( AS_CONNECTED ); + } else if (source.getState() == ZOO_CONNECTING_STATE) { + m_connected = false; + setState( AS_CONNECTING ); + } else if (source.getState() == ZOO_EXPIRED_SESSION_STATE) { + LOG_INFO( LOG, "Received EXPIRED_SESSION event" ); + setState( AS_SESSION_EXPIRED ); + } + m_stateLock.unlock(); + } + m_userEvents.put( source ); + } + } +} + +void +ZooKeeperAdapter::processUserEvents() +{ + TRACE( LOG, "processUserEvents" ); + + while (!m_terminating) { + bool timedOut = false; + ZKWatcherEvent source = m_userEvents.take( 100, &timedOut ); + if (!timedOut) { + try { + handleEvent( source.getType(), + source.getState(), + source.getPath() ); + } catch (std::exception &e) { + LOG_ERROR( LOG, + "Unable to process event (type: %d, state: %d, " + "path: %s), because of exception: %s", + source.getType(), + source.getState(), + source.getPath().c_str(), + e.what() ); + } + } + } +} + +void +ZooKeeperAdapter::registerContext(WatchableMethod method, + const string &path, + ZKEventListener *listener, + ContextType context) +{ + TRACE( LOG, "registerContext" ); + + m_zkContexts[method][path][listener] = context; +} + +ZooKeeperAdapter::Listener2Context +ZooKeeperAdapter::findAndRemoveListenerContext(WatchableMethod method, + const string &path) +{ + TRACE( LOG, "findAndRemoveListenerContext" ); + + Listener2Context listeners; + Path2Listener2Context::iterator elem = m_zkContexts[method].find( path ); + if (elem != m_zkContexts[method].end()) { + listeners = elem->second; + m_zkContexts[method].erase( elem ); + } + return listeners; +} + +void +ZooKeeperAdapter::setState(AdapterState newState) +{ + TRACE( LOG, "setState" ); + if (newState != m_state) { + LOG_INFO( LOG, "Adapter state transition: %d -> %d", m_state, newState ); + m_state = newState; + m_stateLock.notify(); + } else { + LOG_TRACE( LOG, "New state same as the current: %d", newState ); + } +} + + +//TODO move this code to verifyConnection so reconnect() +//is called from one place only +void +ZooKeeperAdapter::waitUntilConnected() + throw(ZooKeeperException) +{ + TRACE( LOG, "waitUntilConnected" ); + long long int timeout = getRemainingConnectTimeout(); + LOG_INFO( LOG, + "Waiting up to %lld ms until a connection to ZK is established", + timeout ); + bool connected; + if (timeout > 0) { + long long int toWait = timeout; + while (m_state != AS_CONNECTED && toWait > 0) { + //check if session expired and reconnect if so + if (m_state == AS_SESSION_EXPIRED) { + LOG_INFO( LOG, + "Reconnecting because the current session has expired" ); + reconnect(); + } + struct timeval now; + gettimeofday( &now, NULL ); + int64_t milliSecs = -(now.tv_sec * 1000LL + now.tv_usec / 1000); + LOG_TRACE( LOG, "About to wait %lld ms", toWait ); + m_stateLock.wait( toWait ); + gettimeofday( &now, NULL ); + milliSecs += now.tv_sec * 1000LL + now.tv_usec / 1000; + toWait -= milliSecs; + } + waitedForConnect( timeout - toWait ); + LOG_INFO( LOG, "Waited %lld ms", timeout - toWait ); + } + connected = (m_state == AS_CONNECTED); + if (!connected) { + if (timeout > 0) { + LOG_WARN( LOG, "Timed out while waiting for connection to ZK" ); + throw ZooKeeperException("Timed out while waiting for " + "connection to ZK"); + } else { + LOG_ERROR( LOG, "Global timeout expired and still not connected to ZK" ); + throw ZooKeeperException("Global timeout expired and still not " + "connected to ZK"); + } + } + LOG_INFO( LOG, "Connected!" ); +} + +void +ZooKeeperAdapter::verifyConnection() throw(ZooKeeperException) +{ + TRACE( LOG, "verifyConnection" ); + + m_stateLock.lock(); + try { + if (m_state == AS_DISCONNECTED) { + throw ZooKeeperException("Disconnected from ZK. " \ + "Please use reconnect() before attempting to use any ZK API"); + } else if (m_state != AS_CONNECTED) { + LOG_TRACE( LOG, "Checking if need to reconnect..." ); + //we are not connected, so check if connection in progress... + if (m_state != AS_CONNECTING) { + LOG_TRACE( LOG, + "yes. Checking if allowed to auto-reconnect..." ); + //...not in progres, so check if we can reconnect + if (!m_zkConfig.getAutoReconnect()) { + //...too bad, disallowed :( + LOG_TRACE( LOG, "no. Sorry." ); + throw ZooKeeperException("ZK connection is down and " + "auto-reconnect is not allowed"); + } else { + LOG_TRACE( LOG, "...yes. About to reconnect" ); + } + //...we are good to retry the connection + reconnect(); + } else { + LOG_TRACE( LOG, "...no, already in CONNECTING state" ); + } + //wait until the connection is established + waitUntilConnected(); + } + } catch (ZooKeeperException &e) { + m_stateLock.unlock(); + throw; + } + m_stateLock.unlock(); +} + +bool +ZooKeeperAdapter::createNode(const string &path, + const string &value, + int flags, + bool createAncestors, + string &returnPath) + throw(ZooKeeperException) +{ + TRACE( LOG, "createNode (internal)" ); + validatePath( path ); + + const int MAX_PATH_LENGTH = 1024; + char realPath[MAX_PATH_LENGTH]; + realPath[0] = 0; + + int rc; + RetryHandler rh(m_zkConfig); + do { + verifyConnection(); + rc = zoo_create( mp_zkHandle, + path.c_str(), + value.c_str(), + value.length(), + &ZOO_OPEN_ACL_UNSAFE, + flags, + realPath, + MAX_PATH_LENGTH ); + } while (rc != ZOK && rh.handleRC(rc)); + if (rc != ZOK) { + if (rc == ZNODEEXISTS) { + //the node already exists + LOG_WARN( LOG, "Error %d for %s", rc, path.c_str() ); + return false; + } else if (rc == ZNONODE && createAncestors) { + LOG_WARN( LOG, "Error %d for %s", rc, path.c_str() ); + //one of the ancestors doesn't exist so lets start from the root + //and make sure the whole path exists, creating missing nodes if + //necessary + for (string::size_type pos = 1; pos != string::npos; ) { + pos = path.find( "/", pos ); + if (pos != string::npos) { + try { + createNode( path.substr( 0, pos ), "", 0, true ); + } catch (ZooKeeperException &e) { + throw ZooKeeperException( string("Unable to create " + "node ") + + path, + rc ); + } + pos++; + } else { + //no more path components + return createNode( path, value, flags, false, returnPath ); + } + } + } + LOG_ERROR( LOG,"Error %d for %s", rc, path.c_str() ); + throw ZooKeeperException( string("Unable to create node ") + + path, + rc ); + } else { + LOG_INFO( LOG, "%s has been created", realPath ); + returnPath = string( realPath ); + return true; + } +} + +bool +ZooKeeperAdapter::createNode(const string &path, + const string &value, + int flags, + bool createAncestors) + throw(ZooKeeperException) +{ + TRACE( LOG, "createNode" ); + + string createdPath; + return createNode( path, value, flags, createAncestors, createdPath ); +} + +int64_t +ZooKeeperAdapter::createSequence(const string &path, + const string &value, + int flags, + bool createAncestors) + throw(ZooKeeperException) +{ + TRACE( LOG, "createSequence" ); + + string createdPath; + bool result = createNode( path, + value, + flags | ZOO_SEQUENCE, + createAncestors, + createdPath ); + if (!result) { + return -1; + } else { + //extract sequence number from the returned path + if (createdPath.find( path ) != 0) { + throw ZooKeeperException( string("Expecting returned path '") + + createdPath + + "' to start with '" + + path + + "'" ); + } + string seqSuffix = + createdPath.substr( path.length(), + createdPath.length() - path.length() ); + char *ptr = NULL; + int64_t seq = strtol( seqSuffix.c_str(), &ptr, 10 ); + if (ptr != NULL && *ptr != '\0') { + throw ZooKeeperException( string("Expecting a number but got ") + + seqSuffix ); + } + return seq; + } +} + +bool +ZooKeeperAdapter::deleteNode(const string &path, + bool recursive, + int version) + throw(ZooKeeperException) +{ + TRACE( LOG, "deleteNode" ); + + validatePath( path ); + + int rc; + RetryHandler rh(m_zkConfig); + do { + verifyConnection(); + rc = zoo_delete( mp_zkHandle, path.c_str(), version ); + } while (rc != ZOK && rh.handleRC(rc)); + if (rc != ZOK) { + if (rc == ZNONODE) { + LOG_WARN( LOG, "Error %d for %s", rc, path.c_str() ); + return false; + } + if (rc == ZNOTEMPTY && recursive) { + LOG_WARN( LOG, "Error %d for %s", rc, path.c_str() ); + //get all children and delete them recursively... + vector<string> nodeList; + getNodeChildren( nodeList, path, NULL ); + for (vector<string>::const_iterator i = nodeList.begin(); + i != nodeList.end(); + ++i) { + deleteNode( *i, true ); + } + //...and finally attempt to delete the node again + return deleteNode( path, false ); + } + LOG_ERROR( LOG, "Error %d for %s", rc, path.c_str() ); + throw ZooKeeperException( string("Unable to delete node ") + path, + rc ); + } else { + LOG_INFO( LOG, "%s has been deleted", path.c_str() ); + return true; + } +} + +bool +ZooKeeperAdapter::nodeExists(const string &path, + ZKEventListener *listener, + void *context, Stat *stat) + throw(ZooKeeperException) +{ + TRACE( LOG, "nodeExists" ); + + validatePath( path ); + + struct Stat tmpStat; + if (stat == NULL) { + stat = &tmpStat; + } + memset( stat, 0, sizeof(Stat) ); + + int rc; + RetryHandler rh(m_zkConfig); + do { + verifyConnection(); + if (context != NULL) { + m_zkContextsMutex.Acquire(); + rc = zoo_exists( mp_zkHandle, + path.c_str(), + (listener != NULL ? 1 : 0), + stat ); + if (rc == ZOK || rc == ZNONODE) { + registerContext( NODE_EXISTS, path, listener, context ); + } + m_zkContextsMutex.Release(); + } else { + rc = zoo_exists( mp_zkHandle, + path.c_str(), + (listener != NULL ? 1 : 0), + stat ); + } + } while (rc != ZOK && rh.handleRC(rc)); + if (rc != ZOK) { + if (rc == ZNONODE) { + LOG_TRACE( LOG, "Node %s does not exist", path.c_str() ); + return false; + } + LOG_ERROR( LOG, "Error %d for %s", rc, path.c_str() ); + throw ZooKeeperException( + string("Unable to check existence of node ") + path, + rc ); + } else { + return true; + } +} + +void +ZooKeeperAdapter::getNodeChildren(vector<string> &nodeList, + const string &path, + ZKEventListener *listener, + void *context) + throw (ZooKeeperException) +{ + TRACE( LOG, "getNodeChildren" ); + + validatePath( path ); + + String_vector children; + memset( &children, 0, sizeof(children) ); + + int rc; + RetryHandler rh(m_zkConfig); + do { + verifyConnection(); + if (context != NULL) { + m_zkContextsMutex.Acquire(); + rc = zoo_get_children( mp_zkHandle, + path.c_str(), + (listener != NULL ? 1 : 0), + &children ); + if (rc == ZOK) { + registerContext( GET_NODE_CHILDREN, path, listener, context ); + } + m_zkContextsMutex.Release(); + } else { + rc = zoo_get_children( mp_zkHandle, + path.c_str(), + (listener != NULL ? 1 : 0), + &children ); + } + } while (rc != ZOK && rh.handleRC(rc)); + if (rc != ZOK) { + LOG_ERROR( LOG, "Error %d for %s", rc, path.c_str() ); + throw ZooKeeperException( string("Unable to get children of node ") + + path, + rc ); + } else { + for (int i = 0; i < children.count; ++i) { + //convert each child's path from relative to absolute + string absPath(path); + if (path != "/") { + absPath.append( "/" ); + } + absPath.append( children.data[i] ); + nodeList.push_back( absPath ); + } + //make sure the order is always deterministic + sort( nodeList.begin(), nodeList.end() ); + } +} + +string +ZooKeeperAdapter::getNodeData(const string &path, + ZKEventListener *listener, + void *context, Stat *stat) + throw(ZooKeeperException) +{ + TRACE( LOG, "getNodeData" ); + + validatePath( path ); + + const int MAX_DATA_LENGTH = 128 * 1024; + char buffer[MAX_DATA_LENGTH]; + memset( buffer, 0, MAX_DATA_LENGTH ); + struct Stat tmpStat; + if (stat == NULL) { + stat = &tmpStat; + } + memset( stat, 0, sizeof(Stat) ); + + int rc; + int len; + RetryHandler rh(m_zkConfig); + do { + verifyConnection(); + len = MAX_DATA_LENGTH - 1; + if (context != NULL) { + m_zkContextsMutex.Acquire(); + rc = zoo_get( mp_zkHandle, + path.c_str(), + (listener != NULL ? 1 : 0), + buffer, &len, stat ); + if (rc == ZOK) { + registerContext( GET_NODE_DATA, path, listener, context ); + } + m_zkContextsMutex.Release(); + } else { + rc = zoo_get( mp_zkHandle, + path.c_str(), + (listener != NULL ? 1 : 0), + buffer, &len, stat ); + } + } while (rc != ZOK && rh.handleRC(rc)); + if (rc != ZOK) { + LOG_ERROR( LOG, "Error %d for %s", rc, path.c_str() ); + throw ZooKeeperException( + string("Unable to get data of node ") + path, rc + ); + } else { + if (len == -1) { + len = 0; + } + return string( buffer, len ); + } +} + +void +ZooKeeperAdapter::setNodeData(const string &path, + const string &value, + int version) + throw(ZooKeeperException) +{ + TRACE( LOG, "setNodeData" ); + + validatePath( path ); + + int rc; + RetryHandler rh(m_zkConfig); + do { + verifyConnection(); + rc = zoo_set( mp_zkHandle, + path.c_str(), + value.c_str(), + value.length(), + version); + } while (rc != ZOK && rh.handleRC(rc)); + if (rc != ZOK) { + LOG_ERROR( LOG, "Error %d for %s", rc, path.c_str() ); + throw ZooKeeperException( string("Unable to set data for node ") + + path, + rc ); + } +} + +} /* end of 'namespace zk' */ +
