Hello Tim,
Thanks for your early reply and great support.
and the provided sample application works for me and i am able to send and
receive messages.
coming to mycode
1. connecting to Marketcetera JMS server (on tcp://<machine>:61616 by
default).
2. creating a FIX message, droping it on the oms-commands queue.
3. Listen for execution reports on the oms-messages topic.
regards
venu
Timothy Bish wrote:
>
> Its a bit hard to follow exactly what's been changed in the sample code
> here but it almost looks like you are producing on a queue but
> attempting to receive on a topic.
>
> Does the provided sample application that comes with activemq-cpp work
> for you out of the box?
>
> Regards
> Tim.
>
> On Tue, 2008-09-09 at 04:43 -0700, crazy4venu wrote:
>> Dear all,
>>
>> I am writing a trading client and i am unable to check the onMessage
>> event
>> not raising.
>>
>> i did the following process in creating and using acitvemq consumer and
>> producer.
>>
>> try {
>> // // Create a ConnectionFactory
>> connectionFactory = new
>> ActiveMQConnectionFactory("tcp://localhost:61616");
>>
>>
>> connectionFactory->setUsername((char*)(LPCTSTR)usr);//xxxxx
>>
>> connectionFactory->setPassword((char*)(LPCTSTR)pswd);//xxxxxxx
>>
>> connection = connectionFactory->createConnection();
>> connection->start();
>>
>> // // Create a Session
>> session = connection->createSession(
>> Session::AUTO_ACKNOWLEDGE );
>> destination = session->createQueue("ors-commands");
>>
>> AfxMessageBox("session created by producer !");
>> //
>> // Create a MessageProducer from the Session to the Topic
>> or
>> Queue
>> producer = session->createProducer( destination );
>> producer->setDeliveryMode( DeliveryMode::NON_PERSISTENT
>> );
>> }catch ( CMSException& e )
>> {
>> e.printStackTrace();
>> MessageBox("ActiveMQConnectionFactory Error !");
>> }
>> //AfxBeginThread(startApp,this);
>>
>>
>>
>>
>> creating consumer
>> ----------------------
>> std::string brokerURI = "tcp://localhost:61616";
>>
>> bool useTopics = true;
>> bool sessionTransacted = false;
>> int numMessages = 20000;
>>
>> try {
>>
>> HelloWorldConsumer consumer( brokerURI, numMessages, useTopics,
>> sessionTransacted );
>>
>> // Start the consumer thread.
>> Thread consumerThread( &consumer );
>> consumerThread.start();
>>
>> // Wait for the consumer to indicate that its ready to go.
>> consumer.waitUntilReady();
>> consumerThread.join();
>>
>>
>> AfxMessageBox("consumer waiting for the messages !");
>> }
>> catch ( CMSException& e )
>> {
>> e.printStackTrace();
>> AfxMessageBox("ActiveMQConnection Consumer Factory Error !");
>> }
>>
>>
>> and consumer class is
>>
>>
>> class HelloWorldConsumer : public ExceptionListener,
>> public MessageListener,
>> public Runnable {
>>
>> private:
>>
>> CountDownLatch latch;
>> CountDownLatch doneLatch;
>> Connection* connection;
>> Session* session;
>> Destination* destination;
>> MessageConsumer* consumer;
>> long waitMillis;
>> bool useTopic;
>> bool sessionTransacted;
>> std::string brokerURI;
>>
>> public:
>>
>> HelloWorldConsumer( const std::string& brokerURI,
>> long numMessages,
>> bool useTopic = false,
>> bool sessionTransacted = false,
>> long waitMillis = 30000 )
>> : latch(1), doneLatch(numMessages){
>> this->connection = NULL;
>> this->session = NULL;
>> this->destination = NULL;
>> this->consumer = NULL;
>> this->waitMillis = waitMillis;
>> this->useTopic = useTopic;
>> this->sessionTransacted = sessionTransacted;
>> this->brokerURI = brokerURI;
>> }
>> virtual ~HelloWorldConsumer(){
>> cleanup();
>> }
>>
>> void waitUntilReady() {
>> latch.await();
>> }
>>
>> virtual void run() {
>>
>> ConnectionFactory* connectionFactory = NULL;
>>
>> try {
>>
>> // Create a ConnectionFactory
>> connectionFactory =
>> ConnectionFactory::createCMSConnectionFactory( brokerURI );
>>
>> // Create a Connection
>> connection = connectionFactory->createConnection();
>>
>> delete connectionFactory;
>> connectionFactory = NULL;
>>
>> connection->start();
>>
>> connection->setExceptionListener(this);
>>
>> // Create a Session
>> if( this->sessionTransacted == true ) {
>> session = connection->createSession( Session::SESSION_TRANSACTED );
>> } else {
>> session = connection->createSession( Session::AUTO_ACKNOWLEDGE );
>> }
>>
>> // Create the destination (Topic or Queue)
>> if( useTopic ) {
>> destination = session->createTopic( "ors_messages" );
>> } else {
>> destination = session->createQueue( "ors_commands" );
>> }
>>
>> // Create a MessageConsumer from the Session to the Topic or Queue
>> consumer = session->createConsumer( destination );
>>
>> consumer->setMessageListener( this );
>>
>> std::cout.flush();
>> std::cerr.flush();
>>
>> // Indicate we are ready for messages.
>> //latch.countDown();
>>
>> // Wait while asynchronous messages come in.
>> //doneLatch.await( waitMillis );
>>
>> } catch (CMSException& e) {
>>
>> // Indicate we are ready for messages.
>> latch.countDown();
>>
>> delete connectionFactory;
>> connectionFactory = NULL;
>>
>> e.printStackTrace();
>> }
>> }
>>
>> // Called from the consumer since this class is a registered
>> MessageListener.
>> virtual void onMessage( const Message* message ){
>>
>> static int count = 0;
>>
>> AfxMessageBox("on Messageeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeee");
>>
>> try
>> {
>> count++;
>> const TextMessage* textMessage =
>> dynamic_cast< const TextMessage* >( message );
>> string text = "";
>>
>> if( textMessage != NULL ) {
>> text = textMessage->getText();
>> } else {
>> text = "NOT A TEXTMESSAGE!";
>> }
>>
>> // printf( "Message #%d Received: %s\n", count, text.c_str() );
>> AfxMessageBox(text.c_str());
>>
>> } catch (CMSException& e) {
>> e.printStackTrace();
>> }
>>
>> // Commit all messages.
>> if( this->sessionTransacted ) {
>> session->commit();
>> }
>>
>> // No matter what, tag the count down latch until done.
>> doneLatch.countDown();
>> }
>>
>> // If something bad happens you see it here as this class is also been
>> // registered as an ExceptionListener with the connection.
>> virtual void onException( const CMSException& ex AMQCPP_UNUSED) {
>> printf("CMS Exception occured. Shutting down client.\n");
>> exit(1);
>> }
>>
>> private:
>>
>> void cleanup(){
>>
>> //*************************************************
>> // Always close destination, consumers and producers before
>> // you destroy their sessions and connection.
>> //*************************************************
>>
>> // Destroy resources.
>> try{
>> if( destination != NULL ) delete destination;
>> }catch (CMSException& e) { e.printStackTrace(); }
>> destination = NULL;
>>
>> try{
>> if( consumer != NULL ) delete consumer;
>> }catch (CMSException& e) { e.printStackTrace(); }
>> consumer = NULL;
>>
>> // Close open resources.
>> try{
>> if( session != NULL ) session->close();
>> if( connection != NULL ) connection->close();
>> }catch (CMSException& e) { e.printStackTrace(); }
>>
>> // Now Destroy them
>> try{
>> if( session != NULL ) delete session;
>> }catch (CMSException& e) { e.printStackTrace(); }
>> session = NULL;
>>
>> try{
>> if( connection != NULL ) delete connection;
>> }catch (CMSException& e) { e.printStackTrace(); }
>> connection = NULL;
>> }
>> };
>>
>>
>> help me in this issue.
>>
>> Regards
>> venu
>
>
>
--
View this message in context:
http://www.nabble.com/onMessage-event-not-raised-tp19390593p19391920.html
Sent from the ActiveMQ - Dev mailing list archive at Nabble.com.