Hi, > I wrapped the boost ipc message queue into Python using Boost.Python. > Everything goes fine except I found that > > 1. the try_receive / try_send will block python when the mq is empty / full. > 2. when I use the wrapped message queue in one Python thread, any > other Python thread would be blocked and won't execute.
I don't know about the boost ipc message queue but my guess is try_receive (...) and try_send(...) are blocking calls(?) and you'll need to allow Python threads to run while blocking. I.e. you'd need to release Python's global interpreter lock (GIL) before the blocking call and reacquire it when the call returns by using the Py_BEGIN_ALLOW_THREADS / Py_END_ALLOW_THREADS macros. Looks to me like you could do so in your message_queue class's methods, something similar to boost::python::str message_queue::try_receive(size_t m_size) { char* buffer = new char[m_size]; bi::message_queue::size_type recvd_size; unsigned int priority; Py_BEGIN_ALLOW_THREADS; p_message_queue->try_receive(buffer, m_size, recvd_size, priority); Py_END_ALLOW_THREADS; return boost::python::str(buffer, m_size); } (fully untested code!) Note that there might be additional intricacies with regard to waking up the blocking call when a message arrives in the queue and/or signal handling, e.g. making the program interruptible while in the blocking call. This depends on the queue implementation and your requirements (play well with signal handlers registered on the Python side, ....) The snippets below are samples from our wrapping of a message queue of the commercial messaging middleware TIB/Rendezvous - it's old code and could be improved but works for us and might just give you an idea plus some documentation. Hope it helps, Holger // file helpers/debug_config.hpp #ifndef DEBUG_CONFIG_HPP #define DEBUG_CONFIG_HPP // __GNUC__ is defined for both C and C++ #if defined(__GNUC__) #define __PRETTY_FUNCTION__ __PRETTY_FUNCTION__ #elif (defined(__SUNPRO_C) || defined(__SUNPRO_CC)) #define __PRETTY_FUNCTION__ __func__ #else #define __PRETTY_FUNCTION__ __func__ #endif // if defined(__GNUC__) #include <iostream> // See proposed solution by Graham Dumpleton here for debug macros: // http://www.velocityreviews.com/forums/t280342-debugging-macros.html // IMHO this has 2 advantages: // - integrates much nicer with code formatting than #ifdef DEBUG ... #endif // - gets compiled even if DEBUG is unset so the debug code gets checked // by the compiler (and optimized away, then, hopefully :) //FIXME: Does this *really* get optimized away? For gcc (4.6.1) the unused //FIXME: output code is left out (checked using the strings cmd), but for //FIXME: Solaris studio i can grep the debug code in the resulting object //FIXME: file in a simple example program... Must we care? // DEBUG switches on all debugging output #ifdef DEBUG #define DEBUG_TRACE #define DEBUG_FUNC #define DEBUG_CTOR #define DEBUG_DTOR #define DEBUG_MODULES #endif // use these switches to switch on specific debug tracing #ifdef DEBUG_TRACE #define DEBUGTRACER_TRACE if (0) ; else std::cout #define DEBUGTRACER_CONDITIONAL if (0) ; else #else #define DEBUGTRACER_TRACE if (1) ; else std::cout #define DEBUGTRACER_CONDITIONAL if (1) ; else #endif #ifdef DEBUG_FUNC #define DEBUGTRACER_FUNC if (0) ; else std::cout #else #define DEBUGTRACER_FUNC if (1) ; else std::cout #endif #ifdef DEBUG_CTOR #define DEBUGTRACER_CTOR if (0) ; else std::cout #else #define DEBUGTRACER_CTOR if (1) ; else std::cout #endif #ifdef DEBUG_DTOR #define DEBUGTRACER_DTOR if (0) ; else std::cout #else #define DEBUGTRACER_DTOR if (1) ; else std::cout #endif #ifdef DEBUG_MODULES #define DEBUGTRACER_MODULES if (0) ; else std::cout #else #define DEBUGTRACER_MODULES if (1) ; else std::cout #endif #endif //DEBUG_CONFIG_HPP // file tibrvqueue.hpp // See tibrvque.cpp for extensive background documentation #ifndef _ThreadedTibrvQueue_hpp #define _ThreadedTibrvQueue_hpp #include <tibrvcpp.h> #include <cassert> #include <list> #include <signal.h> #include "helpers/debug_config.hpp" class ThreadedTibrvQueue : public TibrvQueue { public: ThreadedTibrvQueue(); void waitEvent(); bool waitEvent(double timeout); TibrvStatus destroy(); TibrvStatus destroy(TibrvQueueOnComplete* completeCB, const void* closure = NULL); private: static void _myHook(tibrvQueue eventQueue, void* closure); static void sighandler(int); static void installSignalHandlers(); static void removeSignalHandlers(); private: pthread_cond_t m_cond; pthread_mutex_t m_mutex; typedef std::list<ThreadedTibrvQueue*> instance_list; static instance_list m_instances; static pthread_mutex_t m_instances_mutex; static int m_waiting; typedef void (*sighandler_t)(int); static sighandler_t m_sighandler[NSIG]; }; #endif // file tibrvqueue.cpp // ThreadedTibrvQueue: A TibrvQueue subclass to enable multithreaded python // programs with Rendezvous // // As the TIB/Rv library does not know anything about Python, its // functions/methods do of course not release the python GIL, even in situations // where this would be desired or even essential. E.g. // * TibrvQueue::dispatch() // * TibrvQueue::timedDispatch() // won't release the GIL prior to blocking (either endlessly or for a set interval). // // This means during waiting for an TIB/Rv event (incoming UDP message (which is // also I/O), timer event, I/O Event (platform-dependant)) no other thread will // be allowed to run, as the thread that called dispatch/timedDispatch (usually // the main thread in its main loop) holds the GIL, blocking other threads from // doing their workings. // Therefore, it is essential to release the GIL in such situations. // // The ThreadedTibrvQueue solves this by introducing // * the waitEvent() method which // 1. installs a signal handler to catch "break" signals // 2. releases the GIL // 3. starts waiting for the condition member variable m_cond (i.e. goes to // sleep) // and // * a hook callback method _myHook() (which gets called by the TIB/Rv event // mechanisms whenever an event is put into the queue) that signals the // condition variable m_cond. This notifies the waiting processing in // waitEvent() to continue running, which will then remove the signal handler // and reacquire the GIL. // The signal handler is needed to make the program interruptible e.g. by // ctrl-c while waiting for an event to be put in the queue; it will also signal // the m_cond condition variable to allow waitEvent() to proceed. // // Used together with the normal dispatch/timedDispatch methods this allows for // running multithreaded programs. You'll have to use this in combination, i.e. // the dispatch call must be preceded by a waitEvent() call so your main Rv loop // implementation will basically look like this: // // queue = ThreadedTibrvQueue() // while 1: // queue.waitEvent() // queue.dispatch() // // Possible improvements: // * make waitEvent() a private method and override the // dispatch()/timedDispatch() methods (poll() also?) to call the private // waitEvent(). This would make for a better API interface. // * unify ThreadedTibrvQueue constructor behaviour with standard TibrvQueue // behaviour: ThreadedTibrvQueue performs a create() call in the constructor // while TibrvQueue does not and you have to to this in a second step // // Alternatives: // It is essentially possible to run multithreaded programs with the standard // Tibrv queue if you forgo using blocking dispatch; however, caution is // recommended. Imagine you try to run a threaded program with another running // thread using a main thread loop like // // sys.setcheckinterval(1) # minimal "tick" count for thread switching // queue = Tibrv.defaultQueue() // while 1: // queue.timedDispatch(0.1) // print "..." // // This *will not work*, i.e. the other running thread will not be able to run // properly! The reason for this is that when timedDispatch() returns, while // the main loop thread will release the GIL it is itself waiting to reaqcuire // it immediately. This means we now have 2 threads waiting to proceed: Our // other thread and the main thread wanting to continue with the next loop // iteration. The way thread switching is implemented (at least up to Python 2.6) // one or the other may be scheduled to run, depending on OS scheduler decisions // (see http://www.dabeaz.com/python/UnderstandingGIL.pdf for a thorough // explanation). This can result in the other thread not ever getting the chance // to continue. // It is still possible to support threaded programs with a construct like // // queue = Tibrv.defaultQueue() // while 1: // queue.timedDispatch(0.1) // time.sleep(0.0001) // // Blocking I/O forces release of the GIL, as does time.sleep() (the Python C // macro Py_BEGIN_ALLOW_THREADS basically does that: "hey, I~m entering some // blocking operation, here~s the GIL back" [quoted from // http://jessenoller.com/2009/02/01/python-threads-and-the-global-interpreter-lock/, // which is also an excellent introduction to threading and the GIL]). So this // really gives our 2nd thread the possibility to run. // Note, however, that while the main thread waits in timedDispatch(X) for X // seconds // * no signals will be retrieved to allow program interruption // * the other thread(s) are not able to run // which means X must be reasonably small. In consequence, such a solution would // be suboptimal at best. #include <boost/python.hpp> // silence warning: "_FILE_OFFSET_BITS" redefined #include "tibrvqueue.hpp" #include <queue.h> #include <boost/python/errors.hpp> //#include <boost/python/objects.hpp> //#include <boost/python/conversions.hpp> #include <boost/format.hpp> #include <iostream> #include <string> #include <pythread.h> #include <pthread.h> #include <math.h> #include <signal.h> #include <string.h> #include <errno.h> #include <sys/time.h> //#undef DEBUG //#define DEBUG #ifdef DEBUG // FIXME: This is provided on Solaris but not Linux; is there some equivalent# // we could use instead? #ifdef SIG2STR_MAX std::ostream& operator<<(std::ostream& to, const sigset_t *sigset) { char buf[SIG2STR_MAX]; for (int s=0;s<MAXSIG;s++) { sig2str(s, buf); to << buf << "(" << s << "):" << (sigismember(sigset, s)) << std::endl; } return to; } std::ostream &operator<<(std::ostream &to,const struct sigaction& sa) { to << "sa_flags=" << sa.sa_flags << "("; to << ( (sa.sa_flags & SA_SIGINFO ) ? "SA_SIGINFO " : "" ); to << ( (sa.sa_flags & SA_RESTART ) ? "SA_RESTART " : "" ); to << ( (sa.sa_flags & SA_ONSTACK ) ? "SA_ONSTACK " : "" ); to << ( (sa.sa_flags & SA_NODEFER ) ? "SA_NODEFER " : "" ); to << ( (sa.sa_flags & SA_RESETHAND ) ? "SA_RESETHAND " : "" ); to << ") sa_handler=" << (void*)sa.sa_handler << " sa_sigaction=" << (void*)sa.sa_sigaction; return to; } void dumpsigs() { sigset_t sigset; pthread_sigmask(0, 0, &sigset); std::cout << "thread: " << pthread_self() << " sigset: " << &sigset << std::endl; struct sigaction sa; sigaction(SIGINT, 0, &sa); std::cout << "SIGINT:" << sa << std::endl; sigaction(SIGTERM, 0, &sa); std::cout << "SIGTERM:" << sa << std::endl; sigaction(SIGQUIT, 0, &sa); std::cout << "SIGQUIT:" << sa << std::endl; } #endif // ifdef SIG2STR_MAX #endif // ifdef DEBUG ThreadedTibrvQueue::instance_list ThreadedTibrvQueue::m_instances; pthread_mutex_t ThreadedTibrvQueue::m_instances_mutex; int ThreadedTibrvQueue::m_waiting = 0; ThreadedTibrvQueue::sighandler_t ThreadedTibrvQueue::m_sighandler[NSIG]; ThreadedTibrvQueue::ThreadedTibrvQueue() : TibrvQueue() { // this section is guarded by the global interpreter lock if (m_instances.size() == 0) { pthread_mutex_init(&m_instances_mutex, 0); } // prevent signal handlers from iterating m_instances at this point in time pthread_mutex_lock( &m_instances_mutex ); m_instances.push_back(this); pthread_mutex_unlock( &m_instances_mutex ); TibrvStatus status = create(); if (status != TIBRV_OK) { throw std::runtime_error((boost::format("TibrvQueue::create: %1%") % status.getText()).str()); } pthread_mutex_init(&m_mutex, 0); pthread_cond_init(&m_cond, 0); status = tibrvQueue_SetHook(getHandle(), &_myHook, this); if (status != TIBRV_OK) { throw std::runtime_error((boost::format("tibrvQueue_SetHook: %1%") % status.getText()).str()); } } TibrvStatus ThreadedTibrvQueue::destroy() { DEBUGTRACER_FUNC << "--> " << __PRETTY_FUNCTION__ << std::endl; return destroy(NULL, NULL); } // must not repeat closure's default arg value here: // in tibrvqueue.hpp: TibrvStatus destroy(TibrvQueueOnComplete* completeCB, const void* closure=NULL); TibrvStatus ThreadedTibrvQueue::destroy(TibrvQueueOnComplete* completeCB, const void* closure) { DEBUGTRACER_FUNC << "--> " << __PRETTY_FUNCTION__ << std::endl; if (getHandle() != TIBRV_INVALID_ID) { TibrvStatus status = tibrvQueue_RemoveHook(getHandle()); pthread_cond_destroy(&m_cond); pthread_mutex_destroy(&m_mutex); if (status != TIBRV_OK) { throw std::runtime_error((boost::format("tibrvQueue_RemoveHook: %1%") % status.getText()).str()); } pthread_mutex_lock( &m_instances_mutex ); m_instances.remove(this); pthread_mutex_unlock( &m_instances_mutex ); } DEBUGTRACER_FUNC << "<-- " << __PRETTY_FUNCTION__ << std::endl; return TibrvQueue::destroy(completeCB, closure); } void ThreadedTibrvQueue::waitEvent() { DEBUGTRACER_FUNC << "--> " << __PRETTY_FUNCTION__ << std::endl; tibrv_u32 available = 0; TibrvStatus status; int cond_status = 0; assert(m_waiting>=0); if (!m_waiting) installSignalHandlers(); m_waiting++; Py_BEGIN_ALLOW_THREADS; DEBUGTRACER_TRACE << __PRETTY_FUNCTION__ << ": pthread_mutex_lock" << std::endl; pthread_mutex_lock( &m_mutex ); status = getCount(available); // while (!available && cond_status == 0 && status == TIBRV_OK) { if (!available && status == TIBRV_OK) { cond_status = pthread_cond_wait(&m_cond, &m_mutex); status = getCount(available); DEBUGTRACER_TRACE << __PRETTY_FUNCTION__ << "available = " << available << std::endl; } // assert(available); DEBUGTRACER_TRACE << __PRETTY_FUNCTION__ << ": pthread_mutex_unlock" << std::endl; pthread_mutex_unlock( &m_mutex ); Py_END_ALLOW_THREADS; m_waiting--; assert(m_waiting>=0); if (!m_waiting) removeSignalHandlers(); if (status != TIBRV_OK) { throw std::runtime_error((boost::format("TibrvQueue::getCount: %1%") % status.getText()).str()); } if (cond_status != 0) { //throw std::runtime_error(std::string("pthread_cond_wait failed")); // to raise a standard python exception, either create a custom exception // and register an exception translator, or do it manually like this: // FIXME: Should go to a helper lib that takes the message and the Exception const char message[] = "pthread_cond_wait failed"; PyErr_SetString(PyExc_OSError, message); boost::python::throw_error_already_set(); } #if 0 if (!available) { boost::python::tuple exc_value(boost::python::ref (BOOST_PYTHON_CONVERSION::to_python(EINTR)), boost::python::ref (BOOST_PYTHON_CONVERSION::to_python(strerror(EINTR)))); //throw boost::python::OSError(exc_value.reference().get()); std::string message = (boost::format("%1%") % exc_value.reference ().get()).str(); PyErr_SetString(PyExc_OSError, message.c_str()); boost::python::throw_error_already_set(); } #endif DEBUGTRACER_TRACE << __PRETTY_FUNCTION__ << ": " << available << std::endl; } static void set_interval(struct timespec* timeout_spec, double timeout) { double integral, fractional; time_t sec; long nsec; fractional = modf(timeout, &integral); sec = (time_t)integral; nsec = (long)(fractional*1.0e9); #ifdef sun clock_gettime(CLOCK_REALTIME, timeout_spec); #else struct timeval tv; gettimeofday(&tv, 0); timeout_spec->tv_sec = tv.tv_sec; timeout_spec->tv_nsec = tv.tv_usec*1000; #endif timeout_spec->tv_sec += (sec + (timeout_spec->tv_nsec + nsec) / 1000000000); timeout_spec->tv_nsec = (timeout_spec->tv_nsec + nsec) % 1000000000; } bool ThreadedTibrvQueue::waitEvent(double timeout) { DEBUGTRACER_FUNC << "--> " << __PRETTY_FUNCTION__ << std::endl; tibrv_u32 available = 0; TibrvStatus status; int cond_status = 0; assert(m_waiting>=0); if (!m_waiting) installSignalHandlers(); m_waiting++; Py_BEGIN_ALLOW_THREADS; pthread_mutex_lock( &m_mutex ); struct timespec timeout_spec; set_interval(&timeout_spec, timeout); status = getCount(available); // while (!available && cond_status != ETIMEDOUT && status == TIBRV_OK) { if (!available && status == TIBRV_OK) { cond_status = pthread_cond_timedwait(&m_cond, &m_mutex, &timeout_spec); status = getCount(available); } // assert(available || cond_status == ETIMEDOUT || status != TIBRV_OK); pthread_mutex_unlock( &m_mutex ); Py_END_ALLOW_THREADS; m_waiting--; assert(m_waiting>=0); if (!m_waiting) removeSignalHandlers(); if (status != TIBRV_OK) { throw std::runtime_error((boost::format("TibrvQueue::getCount: %1%") % status.getText()).str()); } if (cond_status != 0 && cond_status != ETIMEDOUT) { // FIXME: Should go to a helper lib const char message[] = "pthread_cond_timedwait failed"; PyErr_SetString(PyExc_OSError, message); boost::python::throw_error_already_set(); } DEBUGTRACER_FUNC << "<-- " << __PRETTY_FUNCTION__ << ": " << available << std::endl; return available != 0; } void ThreadedTibrvQueue::_myHook( tibrvQueue eventQueue, void* closure) { DEBUGTRACER_FUNC << "--> " << __PRETTY_FUNCTION__ << std::endl; ThreadedTibrvQueue* queue = (ThreadedTibrvQueue*)closure; assert(queue->getHandle() == eventQueue); pthread_mutex_lock( &(queue->m_mutex) ); pthread_cond_signal( &(queue->m_cond) ); pthread_mutex_unlock( &(queue->m_mutex) ); DEBUGTRACER_FUNC << "<-- " << __PRETTY_FUNCTION__ << std::endl; } void ThreadedTibrvQueue::sighandler(int signum) { DEBUGTRACER_FUNC << "--> " << __PRETTY_FUNCTION__ << std::endl; DEBUGTRACER_TRACE << "signal " << signum << std::endl; pthread_mutex_lock( &m_instances_mutex ); for (instance_list::iterator iter = m_instances.begin(); iter !=m_instances.end(); iter++) { ThreadedTibrvQueue* queue = *iter; pthread_mutex_lock( &(queue->m_mutex) ); DEBUGTRACER_TRACE << __PRETTY_FUNCTION__ << ": pthread_cond_signal" << std::endl; pthread_cond_signal( &(queue->m_cond) ); DEBUGTRACER_TRACE << __PRETTY_FUNCTION__ << ": pthread_mutex_unlock" << std::endl; pthread_mutex_unlock( &(queue->m_mutex) ); } pthread_mutex_unlock( &m_instances_mutex ); sighandler_t oldhandler = m_sighandler[signum]; #ifdef DEBUG #ifdef SIG2STR_MAX char buf[SIG2STR_MAX]; sig2str(signum, buf); DEBUGTRACER_TRACE << __PRETTY_FUNCTION__ << "oldhandler for " << buf << "=" << (void*)(oldhandler) << std::endl; #endif // ifdef SIG2STR_MAX #endif // ifdef DEBUG if (oldhandler != SIG_DFL && oldhandler != SIG_IGN && oldhandler != SIG_ERR #ifdef SIG_HOLD && oldhandler != SIG_HOLD #endif ) { (*oldhandler)(signum); } DEBUGTRACER_FUNC << "<-- " << __PRETTY_FUNCTION__ << std::endl; } void ThreadedTibrvQueue::installSignalHandlers() { DEBUGTRACER_FUNC << "--> " << __PRETTY_FUNCTION__ << std::endl; m_sighandler[SIGINT] = signal(SIGINT, sighandler); m_sighandler[SIGTERM] = signal(SIGTERM, sighandler); m_sighandler[SIGQUIT] = signal(SIGQUIT, sighandler); DEBUGTRACER_FUNC << "<-- " << __PRETTY_FUNCTION__ << std::endl; } void ThreadedTibrvQueue::removeSignalHandlers() { DEBUGTRACER_FUNC << "--> " << __PRETTY_FUNCTION__ << std::endl; signal(SIGINT, m_sighandler[SIGINT]); signal(SIGTERM, m_sighandler[SIGTERM]); signal(SIGQUIT, m_sighandler[SIGQUIT]); DEBUGTRACER_FUNC << "<-- " << __PRETTY_FUNCTION__ << std::endl; } Landesbank Baden-Wuerttemberg Anstalt des oeffentlichen Rechts Hauptsitze: Stuttgart, Karlsruhe, Mannheim, Mainz HRA 12704 Amtsgericht Stuttgart _______________________________________________ Cplusplus-sig mailing list Cplusplus-sig@python.org https://mail.python.org/mailman/listinfo/cplusplus-sig