Please help me.
sgliu wrote: > > I wish I producer message A, and then I exit the producer program. Then I > start two consumer program(one is C1,the other is C2) at same time.C1 can > receive A , C2 can receive A. > > #include <activemq/concurrent/Thread.h> > #include <activemq/concurrent/Runnable.h> > #include <activemq/core/ActiveMQConnectionFactory.h> > #include <activemq/util/Integer.h> > #include <cms/Connection.h> > #include <cms/Session.h> > #include <cms/TextMessage.h> > #include <cms/ExceptionListener.h> > #include <cms/MessageListener.h> > #include <stdlib.h> > > using namespace activemq::core; > using namespace activemq::util; > using namespace activemq::concurrent; > using namespace cms; > using namespace std; > > class HelloWorldProducer : public Runnable { > private: > > Connection* connection; > Session* session; > Topic* destination; > MessageProducer* producer; > int numMessages; > > public: > > HelloWorldProducer( int numMessages ){ > connection = NULL; > session = NULL; > destination = NULL; > producer = NULL; > this->numMessages = numMessages; > } > > virtual ~HelloWorldProducer(){ > cleanup(); > } > > virtual void run() { > try { > string user,passwd,sID; > user="default"; > passwd=""; > sID="lsgID"; > ActiveMQConnectionFactory* connectionFactory = new > ActiveMQConnectionFactory("tcp://localhost:61613",user,passwd,sID); > > connection = > connectionFactory->createConnection(user,passwd,sID); > connection->start(); > > string sss=connection->getClientId(); > cout << sss << endl; > > session = connection->createSession( Session::AUTO_ACKNOWLEDGE > ); > destination = session->createTopic( "mytopic" ); > > producer = session->createProducer( destination ); > producer->setDeliveryMode( DeliveryMode::PERSISTANT ); > > producer->setTimeToLive(100000000); > string threadIdStr = Integer::toString( Thread::getId() ); > > // Create a messages > string text = (string)"Hello world! from thread " + > threadIdStr; > > for( int ix=0; ix<numMessages; ++ix ){ > TextMessage* message = session->createTextMessage( > text ); > > string messageID="messageID"; > message->setCMSExpiration(10000000000); > message->setCMSMessageId(messageID); > > // Tell the producer to send the message > printf( "Sent message from thread %s\n", threadIdStr.c_str() ); > producer->send( message ); > > delete message; > } > > }catch ( CMSException& e ) { > e.printStackTrace(); > } > } > > private: > > void cleanup(){ > > // Destroy resources. > try{ > if( destination != NULL ) delete destination; > }catch ( CMSException& e ) {} > destination = NULL; > > try{ > if( producer != NULL ) delete producer; > }catch ( CMSException& e ) {} > producer = NULL; > > // Close open resources. > try{ > if( session != NULL ) session->close(); > if( connection != NULL ) connection->close(); > }catch ( CMSException& e ) {} > > try{ > if( session != NULL ) delete session; > }catch ( CMSException& e ) {} > session = NULL; > > try{ > if( connection != NULL ) delete connection; > }catch ( CMSException& e ) {} > connection = NULL; > } > }; > > class HelloWorldConsumer : public ExceptionListener, > public MessageListener, > public Runnable { > > private: > > Connection* connection; > Session* session; > Topic* destination; > MessageConsumer* consumer; > long waitMillis; > > public: > > HelloWorldConsumer( long waitMillis ){ > connection = NULL; > session = NULL; > destination = NULL; > consumer = NULL; > this->waitMillis = waitMillis; > } > virtual ~HelloWorldConsumer(){ > cleanup(); > } > > virtual void run() { > > try { > > string user,passwd,sID; > user="default"; > passwd=""; > sID="lsgID"; > // Create a ConnectionFactory > ActiveMQConnectionFactory* connectionFactory = > new ActiveMQConnectionFactory( > "tcp://localhost:61613",user,passwd,sID); > > // Create a Connection > connection = > connectionFactory->createConnection();//user,passwd,sID); > delete connectionFactory; > connection->start(); > > connection->setExceptionListener(this); > > // Create a Session > session = connection->createSession( Session::AUTO_ACKNOWLEDGE > ); > destination = session->createTopic( "mytopic" ); > consumer = session->createDurableConsumer( > destination , user , "",false); > > consumer->setMessageListener( this ); > > Thread::sleep( waitMillis ); > > } catch (CMSException& e) { > e.printStackTrace(); > } > } > > virtual void onMessage( const Message* message ){ > > try > { > const TextMessage* textMessage = > dynamic_cast< const TextMessage* >( message ); > string text = textMessage->getText(); > printf( "Received: %s\n", text.c_str() ); > } catch (CMSException& e) { > e.printStackTrace(); > } > } > > virtual void onException( const CMSException& ex ) { > printf("JMS Exception occured. Shutting down client.\n"); > } > > private: > > void cleanup(){ > > // Destroy resources. > try{ > if( destination != NULL ) delete destination; > }catch (CMSException& e) {} > destination = NULL; > > try{ > if( consumer != NULL ) delete consumer; > }catch (CMSException& e) {} > consumer = NULL; > > // Close open resources. > try{ > if( session != NULL ) session->close(); > if( connection != NULL ) connection->close(); > }catch (CMSException& e) {} > > try{ > if( session != NULL ) delete session; > }catch (CMSException& e) {} > session = NULL; > > try{ > if( connection != NULL ) delete connection; > }catch (CMSException& e) {} > connection = NULL; > } > }; > void Produce() > { > HelloWorldProducer producer( 2 ); > Thread producerThread( &producer ); > producerThread.start(); > producerThread.join(); > } > void Consumer1() > { > HelloWorldConsumer consumer( 10000 ); > Thread consumerThread( &consumer ); > consumerThread.start(); > consumerThread.join(); > } > void Consumer2() > { > HelloWorldConsumer consumer( 10000 ); > Thread consumerThread( &consumer ); > consumerThread.start(); > consumerThread.join(); > } > int main(int argc, char* argv[]) > { > Produce(); > Consumer1(); > Consumer2(); > } > -- View this message in context: http://www.nabble.com/SOS-tf2666164.html#a7485492 Sent from the ActiveMQ - User mailing list archive at Nabble.com.