On 03/03/2010 05:12 PM, _cristian_ wrote:
I have a program that reads messages from "local_queue". It works fine. It
gets messages from local_queue and prints them to the screen. This program
starts a subscriptionManager that should be stopped when receives SIGINT or
SIGTERM, but it never returns from Subscription manager.stop() function. It
is something wrong with the program or it is a bug ?
SubscriptionManager::stop() is a blocking function. It signals the
SubscriptionManager to stop all subscriptions but it also waits for any
subscription threads that are working on a message to finish. A function that
can block like this is not safe to call inside a signal handler.
I've attached a modified version of your program that just sets a boolean flag
in the signal handler and uses a separate thread that watches the flag and
calls stop() when it is set. That seems to work fine.
#include <iostream>
#include <string>
#include <signal.h>
#include <stdexcept>
#include <qpid/client/Connection.h>
#include <qpid/client/AsyncSession.h>
#include <qpid/client/SubscriptionManager.h>
// FIXME aconway 2010-03-04:
#include <qpid/sys/Monitor.h>
#include <qpid/sys/Runnable.h>
using namespace qpid::client;
using namespace qpid::framing;
using namespace qpid::sys; // FIXME aconway 2010-03-04:
using std::stringstream;
using std::string;
class GatherListener : public MessageListener {
private:
AsyncSession asyncSession;
public:
GatherListener(Session& session): asyncSession(session) {}
virtual void received(Message& request) {
stringstream text;
text << request.getData();
std::cout<<text.str()<<std::endl;
}
};
//subscriptions variable is global because i want to use it in signal handler
qpid::client::SubscriptionManager *subscriptions=NULL;
// FIXME aconway 2010-03-04: intermediate thread.
// void shutdown_handler(int signal) {
// std::cout<<"Before stop()"<<std::endl;
// subscriptions->stop();
// std::cout<<"After stop()"<<std::endl;
// }
Monitor stoppedMonitor;
bool stopped=false;
struct CheckStopped : public Runnable {
void run() {
Monitor::ScopedLock l(stoppedMonitor);
std::cout << "CheckStopped thread started" << std::endl;
while (!stopped) stoppedMonitor.wait();
std::cout << "before stop" << std::endl;
subscriptions->stop();
std::cout << "after stop" << std::endl;
}
};
void shutdown_handler(int signal) {
std::cout<<"shutdown_handler"<<std::endl;
Monitor::ScopedLock l(stoppedMonitor);
stopped = true;
stoppedMonitor.notifyAll();
}
int main(int argc, char** argv) {
// FIXME aconway 2010-03-04:
CheckStopped cs;
Thread checkThread(cs);
signal(SIGINT, shutdown_handler);
signal(SIGTERM, shutdown_handler);
qpid::client::Connection connection;
try {
connection.open("localhost",5672, "user", "password");
} catch( const std::exception &e ) {
std::cout<<e.what()<<std::endl;
}
if( connection.isOpen() ) {
qpid::client::Session session;
session = connection.newSession();
session.queueDeclare(qpid::client::arg::queue = "local_queue");
session.exchangeBind( qpid::client::arg::exchange = "amq.direct",
qpid::client::arg::queue = "local_queue", qpid::client::arg::bindingKey =
"local_queue");
subscriptions = new qpid::client::SubscriptionManager(session);
subscriptions->setAutoStop();
GatherListener gather_listener(session);
subscriptions->subscribe(gather_listener, "local_queue");
std::cout<<"Before run()"<<std::endl;
subscriptions->run();
std::cout<<"After run()"<<std::endl;
connection.close();
session.close();
}
if( subscriptions != NULL ) {
delete subscriptions;
}
return (EXIT_SUCCESS);
}
---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project: http://qpid.apache.org
Use/Interact: mailto:[email protected]