Its a bit hard to follow exactly what's been changed in the sample code
here but it almost looks like you are producing on a queue but
attempting to receive on a topic.
Does the provided sample application that comes with activemq-cpp work
for you out of the box?
Regards
Tim.
On Tue, 2008-09-09 at 04:43 -0700, crazy4venu wrote:
> Dear all,
>
> I am writing a trading client and i am unable to check the onMessage event
> not raising.
>
> i did the following process in creating and using acitvemq consumer and
> producer.
>
> try {
> // // Create a ConnectionFactory
> connectionFactory = new
> ActiveMQConnectionFactory("tcp://localhost:61616");
>
> connectionFactory->setUsername((char*)(LPCTSTR)usr);//xxxxx
>
> connectionFactory->setPassword((char*)(LPCTSTR)pswd);//xxxxxxx
>
> connection = connectionFactory->createConnection();
> connection->start();
>
> // // Create a Session
> session = connection->createSession(
> Session::AUTO_ACKNOWLEDGE );
> destination = session->createQueue("ors-commands");
>
> AfxMessageBox("session created by producer !");
> //
> // Create a MessageProducer from the Session to the Topic or
> Queue
> producer = session->createProducer( destination );
> producer->setDeliveryMode( DeliveryMode::NON_PERSISTENT );
> }catch ( CMSException& e )
> {
> e.printStackTrace();
> MessageBox("ActiveMQConnectionFactory Error !");
> }
> //AfxBeginThread(startApp,this);
>
>
>
>
> creating consumer
> ----------------------
> std::string brokerURI = "tcp://localhost:61616";
>
> bool useTopics = true;
> bool sessionTransacted = false;
> int numMessages = 20000;
>
> try {
>
> HelloWorldConsumer consumer( brokerURI, numMessages, useTopics,
> sessionTransacted );
>
> // Start the consumer thread.
> Thread consumerThread( &consumer );
> consumerThread.start();
>
> // Wait for the consumer to indicate that its ready to go.
> consumer.waitUntilReady();
> consumerThread.join();
>
>
> AfxMessageBox("consumer waiting for the messages !");
> }
> catch ( CMSException& e )
> {
> e.printStackTrace();
> AfxMessageBox("ActiveMQConnection Consumer Factory Error !");
> }
>
>
> and consumer class is
>
>
> class HelloWorldConsumer : public ExceptionListener,
> public MessageListener,
> public Runnable {
>
> private:
>
> CountDownLatch latch;
> CountDownLatch doneLatch;
> Connection* connection;
> Session* session;
> Destination* destination;
> MessageConsumer* consumer;
> long waitMillis;
> bool useTopic;
> bool sessionTransacted;
> std::string brokerURI;
>
> public:
>
> HelloWorldConsumer( const std::string& brokerURI,
> long numMessages,
> bool useTopic = false,
> bool sessionTransacted = false,
> long waitMillis = 30000 )
> : latch(1), doneLatch(numMessages){
> this->connection = NULL;
> this->session = NULL;
> this->destination = NULL;
> this->consumer = NULL;
> this->waitMillis = waitMillis;
> this->useTopic = useTopic;
> this->sessionTransacted = sessionTransacted;
> this->brokerURI = brokerURI;
> }
> virtual ~HelloWorldConsumer(){
> cleanup();
> }
>
> void waitUntilReady() {
> latch.await();
> }
>
> virtual void run() {
>
> ConnectionFactory* connectionFactory = NULL;
>
> try {
>
> // Create a ConnectionFactory
> connectionFactory =
> ConnectionFactory::createCMSConnectionFactory( brokerURI );
>
> // Create a Connection
> connection = connectionFactory->createConnection();
>
> delete connectionFactory;
> connectionFactory = NULL;
>
> connection->start();
>
> connection->setExceptionListener(this);
>
> // Create a Session
> if( this->sessionTransacted == true ) {
> session = connection->createSession( Session::SESSION_TRANSACTED );
> } else {
> session = connection->createSession( Session::AUTO_ACKNOWLEDGE );
> }
>
> // Create the destination (Topic or Queue)
> if( useTopic ) {
> destination = session->createTopic( "ors_messages" );
> } else {
> destination = session->createQueue( "ors_commands" );
> }
>
> // Create a MessageConsumer from the Session to the Topic or Queue
> consumer = session->createConsumer( destination );
>
> consumer->setMessageListener( this );
>
> std::cout.flush();
> std::cerr.flush();
>
> // Indicate we are ready for messages.
> //latch.countDown();
>
> // Wait while asynchronous messages come in.
> //doneLatch.await( waitMillis );
>
> } catch (CMSException& e) {
>
> // Indicate we are ready for messages.
> latch.countDown();
>
> delete connectionFactory;
> connectionFactory = NULL;
>
> e.printStackTrace();
> }
> }
>
> // Called from the consumer since this class is a registered
> MessageListener.
> virtual void onMessage( const Message* message ){
>
> static int count = 0;
>
> AfxMessageBox("on Messageeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeee");
>
> try
> {
> count++;
> const TextMessage* textMessage =
> dynamic_cast< const TextMessage* >( message );
> string text = "";
>
> if( textMessage != NULL ) {
> text = textMessage->getText();
> } else {
> text = "NOT A TEXTMESSAGE!";
> }
>
> // printf( "Message #%d Received: %s\n", count, text.c_str() );
> AfxMessageBox(text.c_str());
>
> } catch (CMSException& e) {
> e.printStackTrace();
> }
>
> // Commit all messages.
> if( this->sessionTransacted ) {
> session->commit();
> }
>
> // No matter what, tag the count down latch until done.
> doneLatch.countDown();
> }
>
> // If something bad happens you see it here as this class is also been
> // registered as an ExceptionListener with the connection.
> virtual void onException( const CMSException& ex AMQCPP_UNUSED) {
> printf("CMS Exception occured. Shutting down client.\n");
> exit(1);
> }
>
> private:
>
> void cleanup(){
>
> //*************************************************
> // Always close destination, consumers and producers before
> // you destroy their sessions and connection.
> //*************************************************
>
> // Destroy resources.
> try{
> if( destination != NULL ) delete destination;
> }catch (CMSException& e) { e.printStackTrace(); }
> destination = NULL;
>
> try{
> if( consumer != NULL ) delete consumer;
> }catch (CMSException& e) { e.printStackTrace(); }
> consumer = NULL;
>
> // Close open resources.
> try{
> if( session != NULL ) session->close();
> if( connection != NULL ) connection->close();
> }catch (CMSException& e) { e.printStackTrace(); }
>
> // Now Destroy them
> try{
> if( session != NULL ) delete session;
> }catch (CMSException& e) { e.printStackTrace(); }
> session = NULL;
>
> try{
> if( connection != NULL ) delete connection;
> }catch (CMSException& e) { e.printStackTrace(); }
> connection = NULL;
> }
> };
>
>
> help me in this issue.
>
> Regards
> venu