Dear all,
I am writing a trading client and i am unable to check the onMessage event
not raising.
i did the following process in creating and using acitvemq consumer and
producer.
try {
// // Create a ConnectionFactory
connectionFactory = new
ActiveMQConnectionFactory("tcp://localhost:61616");
connectionFactory->setUsername((char*)(LPCTSTR)usr);//xxxxx
connectionFactory->setPassword((char*)(LPCTSTR)pswd);//xxxxxxx
connection = connectionFactory->createConnection();
connection->start();
// // Create a Session
session = connection->createSession(
Session::AUTO_ACKNOWLEDGE );
destination = session->createQueue("ors-commands");
AfxMessageBox("session created by producer !");
//
// Create a MessageProducer from the Session to the Topic or
Queue
producer = session->createProducer( destination );
producer->setDeliveryMode( DeliveryMode::NON_PERSISTENT );
}catch ( CMSException& e )
{
e.printStackTrace();
MessageBox("ActiveMQConnectionFactory Error !");
}
//AfxBeginThread(startApp,this);
creating consumer
----------------------
std::string brokerURI = "tcp://localhost:61616";
bool useTopics = true;
bool sessionTransacted = false;
int numMessages = 20000;
try {
HelloWorldConsumer consumer( brokerURI, numMessages, useTopics,
sessionTransacted );
// Start the consumer thread.
Thread consumerThread( &consumer );
consumerThread.start();
// Wait for the consumer to indicate that its ready to go.
consumer.waitUntilReady();
consumerThread.join();
AfxMessageBox("consumer waiting for the messages !");
}
catch ( CMSException& e )
{
e.printStackTrace();
AfxMessageBox("ActiveMQConnection Consumer Factory Error !");
}
and consumer class is
class HelloWorldConsumer : public ExceptionListener,
public MessageListener,
public Runnable {
private:
CountDownLatch latch;
CountDownLatch doneLatch;
Connection* connection;
Session* session;
Destination* destination;
MessageConsumer* consumer;
long waitMillis;
bool useTopic;
bool sessionTransacted;
std::string brokerURI;
public:
HelloWorldConsumer( const std::string& brokerURI,
long numMessages,
bool useTopic = false,
bool sessionTransacted = false,
long waitMillis = 30000 )
: latch(1), doneLatch(numMessages){
this->connection = NULL;
this->session = NULL;
this->destination = NULL;
this->consumer = NULL;
this->waitMillis = waitMillis;
this->useTopic = useTopic;
this->sessionTransacted = sessionTransacted;
this->brokerURI = brokerURI;
}
virtual ~HelloWorldConsumer(){
cleanup();
}
void waitUntilReady() {
latch.await();
}
virtual void run() {
ConnectionFactory* connectionFactory = NULL;
try {
// Create a ConnectionFactory
connectionFactory =
ConnectionFactory::createCMSConnectionFactory( brokerURI );
// Create a Connection
connection = connectionFactory->createConnection();
delete connectionFactory;
connectionFactory = NULL;
connection->start();
connection->setExceptionListener(this);
// Create a Session
if( this->sessionTransacted == true ) {
session = connection->createSession( Session::SESSION_TRANSACTED );
} else {
session = connection->createSession( Session::AUTO_ACKNOWLEDGE );
}
// Create the destination (Topic or Queue)
if( useTopic ) {
destination = session->createTopic( "ors_messages" );
} else {
destination = session->createQueue( "ors_commands" );
}
// Create a MessageConsumer from the Session to the Topic or Queue
consumer = session->createConsumer( destination );
consumer->setMessageListener( this );
std::cout.flush();
std::cerr.flush();
// Indicate we are ready for messages.
//latch.countDown();
// Wait while asynchronous messages come in.
//doneLatch.await( waitMillis );
} catch (CMSException& e) {
// Indicate we are ready for messages.
latch.countDown();
delete connectionFactory;
connectionFactory = NULL;
e.printStackTrace();
}
}
// Called from the consumer since this class is a registered
MessageListener.
virtual void onMessage( const Message* message ){
static int count = 0;
AfxMessageBox("on Messageeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeee");
try
{
count++;
const TextMessage* textMessage =
dynamic_cast< const TextMessage* >( message );
string text = "";
if( textMessage != NULL ) {
text = textMessage->getText();
} else {
text = "NOT A TEXTMESSAGE!";
}
// printf( "Message #%d Received: %s\n", count, text.c_str() );
AfxMessageBox(text.c_str());
} catch (CMSException& e) {
e.printStackTrace();
}
// Commit all messages.
if( this->sessionTransacted ) {
session->commit();
}
// No matter what, tag the count down latch until done.
doneLatch.countDown();
}
// If something bad happens you see it here as this class is also been
// registered as an ExceptionListener with the connection.
virtual void onException( const CMSException& ex AMQCPP_UNUSED) {
printf("CMS Exception occured. Shutting down client.\n");
exit(1);
}
private:
void cleanup(){
//*************************************************
// Always close destination, consumers and producers before
// you destroy their sessions and connection.
//*************************************************
// Destroy resources.
try{
if( destination != NULL ) delete destination;
}catch (CMSException& e) { e.printStackTrace(); }
destination = NULL;
try{
if( consumer != NULL ) delete consumer;
}catch (CMSException& e) { e.printStackTrace(); }
consumer = NULL;
// Close open resources.
try{
if( session != NULL ) session->close();
if( connection != NULL ) connection->close();
}catch (CMSException& e) { e.printStackTrace(); }
// Now Destroy them
try{
if( session != NULL ) delete session;
}catch (CMSException& e) { e.printStackTrace(); }
session = NULL;
try{
if( connection != NULL ) delete connection;
}catch (CMSException& e) { e.printStackTrace(); }
connection = NULL;
}
};
help me in this issue.
Regards
venu
--
View this message in context:
http://www.nabble.com/onMessage-event-not-raised-tp19390593p19390593.html
Sent from the ActiveMQ - Dev mailing list archive at Nabble.com.