Copy pasting the entire sample main code to reproduce my issue #include "stdafx.h" #include <activemq/concurrent/Thread.h> #include <activemq/concurrent/Runnable.h> #include <activemq/core/ActiveMQConnectionFactory.h> #include <cms/Connection.h> #include <cms/Session.h> #include <cms/TextMessage.h> #include <cms/ExceptionListener.h> #include <cms/MessageListener.h> #include <stdlib.h> #include <stdio.h>
using namespace activemq::core; using namespace activemq::concurrent; using namespace cms; using namespace std; bool GenerateGuid(char* buf, size_t strsz) { GUID a_guid; UuidCreate(&a_guid); if (strsz > 38) { sprintf(buf, "{%08X-%04X-%04X-%02X%02X-%02X%02X%02X%02X%02X%02X}", a_guid.Data1, a_guid.Data2, a_guid.Data3, a_guid.Data4[0], a_guid.Data4[1], a_guid.Data4[2], a_guid.Data4[3], a_guid.Data4[4], a_guid.Data4[5], a_guid.Data4[6], a_guid.Data4[7] ); return true; } return false; }; class HelloWorldProducer : public Runnable, public MessageListener { private: Connection* connection; Session* session; Destination* destination; MessageProducer* producer; MessageConsumer* consumer; int numMessages; public: HelloWorldProducer( int numMessages ){ connection = NULL; session = NULL; destination = NULL; producer = NULL; this->numMessages = numMessages; } virtual void onMessage( const Message* message ){ try { const TextMessage* textMessage = dynamic_cast< const TextMessage* >( message ); string text = textMessage->getText(); printf( "Producer Received: %s\n", text.c_str() ); } catch (CMSException& e) { e.printStackTrace(); } } virtual ~HelloWorldProducer(){ cleanup(); } virtual void run() { try { // Create a ConnectionFactory ActiveMQConnectionFactory* connectionFactory = new ActiveMQConnectionFactory("tcp://127.0.0.1:61617"); // Create a Connection connection = connectionFactory->createConnection(); connection->start(); // Create a Session session = connection->createSession( Session::AUTO_ACKNOWLEDGE ); // Create the destination (Topic or Queue) destination = session->createTopic( "TEST.FOO" ); // Create a MessageProducer from the Session to the Topic or Queue cout << endl << "Producer has been registered at " << destination->toString() << endl << endl; producer = session->createProducer( destination ); producer->setDeliveryMode( DeliveryMode::NON_PERSISTANT ); // Lets have a reply-back channel to this producer now char* charGuid = new char[40]; GenerateGuid(charGuid, 39); cms::Topic* replyTopic = session->createTopic(charGuid); consumer = session->createConsumer( replyTopic ); consumer->setMessageListener(this); cout << "Reply back channel has been registered at " << replyTopic->toString() << endl << endl; // Stringify the thread id char threadIdStr[100]; _snprintf(threadIdStr, sizeof(threadIdStr), "%d", Thread::getId() ); // Create a messages string text = (string)"Hello world! from thread " + threadIdStr; for( int ix=0; ix<numMessages; ++ix ){ TextMessage* message = session->createTextMessage( text ); message->setCMSReplyTo(replyTopic->toProviderString()); // Tell the producer to send the message printf( "Producer Sent message from thread %s\n", threadIdStr); producer->send( message ); delete message; } }catch ( CMSException& e ) { e.printStackTrace(); } } private: void cleanup(){ // Destroy resources. try{ if( destination != NULL ) delete destination; }catch ( CMSException& e ) {} destination = NULL; try{ if( producer != NULL ) delete producer; }catch ( CMSException& e ) {} producer = NULL; // Close open resources. try{ if( session != NULL ) session->close(); if( connection != NULL ) connection->close(); }catch ( CMSException& e ) {} try{ if( session != NULL ) delete session; }catch ( CMSException& e ) {} session = NULL; try{ if( connection != NULL ) delete connection; }catch ( CMSException& e ) {} connection = NULL; } }; class HelloWorldConsumer : public ExceptionListener, public MessageListener, public Runnable { private: Connection* connection; Session* session; Destination* destination; MessageConsumer* consumer; long waitMillis; public: HelloWorldConsumer( long waitMillis ){ connection = NULL; session = NULL; destination = NULL; consumer = NULL; this->waitMillis = waitMillis; } virtual ~HelloWorldConsumer(){ cleanup(); } virtual void run() { try { // Create a ConnectionFactory ActiveMQConnectionFactory* connectionFactory = new ActiveMQConnectionFactory( "tcp://127.0.0.1:61617" ); // Create a Connection connection = connectionFactory->createConnection(); delete connectionFactory; connection->start(); connection->setExceptionListener(this); // Create a Session session = connection->createSession( Session::AUTO_ACKNOWLEDGE ); // Create the destination (Topic or Queue) destination = session->createTopic( "TEST.FOO" ); // Create a MessageConsumer from the Session to the Topic or Queue consumer = session->createConsumer( destination ); consumer->setMessageListener( this ); // Sleep while asynchronous messages come in. Thread::sleep( waitMillis ); } catch (CMSException& e) { e.printStackTrace(); } } virtual void onMessage( const Message* message ){ try { // display the message received const TextMessage* textMessage = dynamic_cast< const TextMessage* >( message ); string text = textMessage->getText(); printf( "Consumer Received: %s\n", text.c_str() ); // lets reply back with a thanks if (textMessage->getCMSReplyTo().c_str() && strcmp("null", textMessage->getCMSReplyTo().c_str())==1 ) { cms::Topic* destination_ = session->createTopic( textMessage->getCMSReplyTo().c_str() ); MessageProducer* producer = session->createProducer( destination_ ); producer->setDeliveryMode( DeliveryMode::NON_PERSISTANT ); cout << endl << "Consumer Replying back to " << destination_->toString() << endl; TextMessage* message_ = session->createTextMessage( "Thank you for Hello World !!!" ); producer->send(message_); } } catch (CMSException& e) { e.printStackTrace(); } } virtual void onException( const CMSException& ex ) { printf("JMS Exception occured. Shutting down client.\n"); } private: void cleanup(){ // Destroy resources. try{ if( destination != NULL ) delete destination; }catch (CMSException& e) {} destination = NULL; try{ if( consumer != NULL ) delete consumer; }catch (CMSException& e) {} consumer = NULL; // Close open resources. try{ if( session != NULL ) session->close(); if( connection != NULL ) connection->close(); }catch (CMSException& e) {} try{ if( session != NULL ) delete session; }catch (CMSException& e) {} session = NULL; try{ if( connection != NULL ) delete connection; }catch (CMSException& e) {} connection = NULL; } }; int _tmain(int argc, _TCHAR* argv[]) { HelloWorldProducer producer( 5 ); HelloWorldConsumer consumer( 1500 ); // Start the consumer thread. Thread consumerThread( &consumer ); consumerThread.start(); // Start the producer thread. Thread producerThread( &producer ); producerThread.start(); // Wait for the threads to complete. producerThread.join(); consumerThread.join(); cout << endl << endl << endl; return 0; } Lalit Nagpal wrote: > > Hi > > I am using the activemq-cpp cms api 1.0 release. The problem I am facing > is like this- > My producer sends a message to the consumer and a message should be sent > from the receiving end as a reply after this - consider a situation where > a loginRequest message has been sent and now a loginReply message should > be sent from the receiving end. > > Attached is a sample main that can reproduce the problem I am facing - I > have modified the sample helloproducer helloconsumer code available at > http://activemq.org/site/activemq-cpp-client.html > to reproduce my problem so that its easier for you to see. > > http://www.nabble.com/file/6111/DestProbs.cpp DestProbs.cpp > > If you execute this piece of code you will see the output as in the > attached image file > > http://www.nabble.com/file/6110/DestinationProblem.JPG > > This is what the code does > producer lets call it xxx sends a "Hello world! from thread xxxx" to the > consumer > consumer lets call it yyy receives the message and displays it > this is the normal behavior as given in the example on > http://activemq.org/site/activemq-cpp-client.html > Following extra needs to be done now > from yyy a reply should go back to xxx ... for this i registered a > producer at yyy by creating a topic using the message->getCMSReplyTo() and > then replying back to that destination. > > The mismatch can be easily see by doing a bstat .... when I created the > consumer yyy initially it created a topic by name say ABCDEFGH (which is a > random id) and later on when I used the message->getCMSReplyTo() to create > a topic the topic was registered with the name /topic/ABCDEFGH ..... the > additional /topic/ that has got added is doing a mess up here and the > replies from yyy to xxx are not reaching xxx (getting enqueued and not > dequeued) ... > > the /topic/ gets added due to the following statement in > HelloWorldProducer - run method > message->setCMSReplyTo(replyTopic->toProviderString()); > here the toProviderString method adds it actually ... if you replace this > method with just the toString() method ... you will get a stomp exception > saying that destinations should start with either /topic/ or /queue/ > > Can somebody make this code work please. > > For every message sent by the producer to consumer "Hello world! from > thread xxxx" there should be a reply coming back as "Thank you for Hello > World !!!" > > Please help me urgently here. > > Thank you in advance > > Lalit Nagpal > CSA, SunGard > > -- View this message in context: http://www.nabble.com/activemq-cms-stomp---creating-a-topic-from-message-%3EgetCMSReplyTo%28%29-does-not-work-tf3142309.html#a8709155 Sent from the ActiveMQ - User mailing list archive at Nabble.com.