On 03/03/2010 05:12 PM, _cristian_ wrote:

I have a program that reads messages from "local_queue". It works fine. It
gets messages from local_queue and prints them to the screen. This program
starts a subscriptionManager that should be stopped when  receives SIGINT or
SIGTERM, but it never returns from Subscription manager.stop() function. It
is something wrong with the program or it is a bug ?

SubscriptionManager::stop() is a blocking function. It signals the SubscriptionManager to stop all subscriptions but it also waits for any subscription threads that are working on a message to finish. A function that can block like this is not safe to call inside a signal handler.

I've attached a modified version of your program that just sets a boolean flag in the signal handler and uses a separate thread that watches the flag and calls stop() when it is set. That seems to work fine.

#include <iostream>
#include <string>
#include <signal.h>
#include <stdexcept>
#include <qpid/client/Connection.h>
#include <qpid/client/AsyncSession.h>
#include <qpid/client/SubscriptionManager.h>

// FIXME aconway 2010-03-04:
#include <qpid/sys/Monitor.h>
#include <qpid/sys/Runnable.h>

using namespace qpid::client;
using namespace qpid::framing;
using namespace qpid::sys;      // FIXME aconway 2010-03-04: 

using std::stringstream;
using std::string;

class GatherListener : public MessageListener {
  private:
    AsyncSession asyncSession;
  public:
    GatherListener(Session& session): asyncSession(session) {}

    virtual void received(Message& request) {
        stringstream text;
        text << request.getData();
        std::cout<<text.str()<<std::endl;
    }
};

//subscriptions variable is global because i want to use it in signal handler
qpid::client::SubscriptionManager *subscriptions=NULL;

// FIXME aconway 2010-03-04: intermediate thread.
// void shutdown_handler(int signal) {
//     std::cout<<"Before stop()"<<std::endl;
//     subscriptions->stop();
//     std::cout<<"After stop()"<<std::endl;
// }

Monitor stoppedMonitor;
bool stopped=false;
struct CheckStopped : public Runnable {
    void run() {
        Monitor::ScopedLock l(stoppedMonitor);
        std::cout << "CheckStopped thread started" << std::endl;
        while (!stopped) stoppedMonitor.wait();
        std::cout << "before stop" << std::endl;
        subscriptions->stop();
        std::cout << "after stop" << std::endl;
    }
};

void shutdown_handler(int signal) {
    std::cout<<"shutdown_handler"<<std::endl;
    Monitor::ScopedLock l(stoppedMonitor);
    stopped = true;
    stoppedMonitor.notifyAll();
}

int main(int argc, char** argv) {
    // FIXME aconway 2010-03-04: 
    CheckStopped cs;
    Thread checkThread(cs);

    signal(SIGINT, shutdown_handler);
    signal(SIGTERM, shutdown_handler);

    qpid::client::Connection connection;

    try {
        connection.open("localhost",5672, "user", "password");
    } catch( const std::exception &e ) {
        std::cout<<e.what()<<std::endl;
    }

    if( connection.isOpen() ) {
        qpid::client::Session session;

        session = connection.newSession();
        session.queueDeclare(qpid::client::arg::queue = "local_queue");
        session.exchangeBind( qpid::client::arg::exchange = "amq.direct",
                              qpid::client::arg::queue = "local_queue",  qpid::client::arg::bindingKey =
                              "local_queue");

        subscriptions = new qpid::client::SubscriptionManager(session);
        subscriptions->setAutoStop();

        GatherListener gather_listener(session);
        subscriptions->subscribe(gather_listener, "local_queue");

        std::cout<<"Before run()"<<std::endl;
        subscriptions->run();
        std::cout<<"After run()"<<std::endl;
        connection.close();
        session.close();
    }

    if( subscriptions != NULL ) {
        delete subscriptions;
    }

    return (EXIT_SUCCESS);
}



---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project:      http://qpid.apache.org
Use/Interact: mailto:[email protected]

Reply via email to