Try subscribing the consumers first before sending messages.

int main(int argc, char* argv[]) { Consumer1(); Consumer2(); Produce();
}



sgliu wrote:
Please help me.


sgliu wrote:
I wish I producer  message A, and then I exit the producer program. Then I
start two consumer program(one is C1,the other is C2) at same time.C1  can
receive A , C2 can receive A.

#include <activemq/concurrent/Thread.h> #include <activemq/concurrent/Runnable.h> #include <activemq/core/ActiveMQConnectionFactory.h> #include <activemq/util/Integer.h> #include <cms/Connection.h> #include <cms/Session.h> #include <cms/TextMessage.h> #include <cms/ExceptionListener.h> #include <cms/MessageListener.h> #include <stdlib.h> using namespace activemq::core; using namespace activemq::util; using namespace activemq::concurrent; using namespace cms; using namespace std; class HelloWorldProducer : public Runnable { private: Connection* connection; Session* session; Topic* destination; MessageProducer* producer; int numMessages; public: HelloWorldProducer( int numMessages ){ connection = NULL; session = NULL; destination = NULL; producer = NULL; this->numMessages = numMessages; } virtual ~HelloWorldProducer(){ cleanup(); } virtual void run() { try { string user,passwd,sID; user="default"; passwd=""; sID="lsgID"; ActiveMQConnectionFactory* connectionFactory = new ActiveMQConnectionFactory("tcp://localhost:61613",user,passwd,sID);
            connection =
connectionFactory->createConnection(user,passwd,sID); connection->start(); string sss=connection->getClientId(); cout << sss << endl;
            session = connection->createSession( Session::AUTO_ACKNOWLEDGE
); destination = session->createTopic( "mytopic" ); producer = session->createProducer( destination ); producer->setDeliveryMode( DeliveryMode::PERSISTANT ); producer->setTimeToLive(100000000); string threadIdStr = Integer::toString( Thread::getId() ); // Create a messages string text = (string)"Hello world! from thread " + threadIdStr; for( int ix=0; ix<numMessages; ++ix ){ TextMessage* message = session->createTextMessage( text ); string messageID="messageID"; message->setCMSExpiration(10000000000); message->setCMSMessageId(messageID); // Tell the producer to send the message printf( "Sent message from thread %s\n", threadIdStr.c_str() ); producer->send( message ); delete message; } }catch ( CMSException& e ) { e.printStackTrace(); } } private: void cleanup(){ // Destroy resources. try{ if( destination != NULL ) delete destination; }catch ( CMSException& e ) {} destination = NULL; try{ if( producer != NULL ) delete producer; }catch ( CMSException& e ) {} producer = NULL; // Close open resources. try{ if( session != NULL ) session->close(); if( connection != NULL ) connection->close(); }catch ( CMSException& e ) {} try{ if( session != NULL ) delete session; }catch ( CMSException& e ) {} session = NULL; try{ if( connection != NULL ) delete connection; }catch ( CMSException& e ) {} connection = NULL; } }; class HelloWorldConsumer : public ExceptionListener, public MessageListener, public Runnable { private: Connection* connection; Session* session; Topic* destination; MessageConsumer* consumer; long waitMillis; public: HelloWorldConsumer( long waitMillis ){ connection = NULL; session = NULL; destination = NULL; consumer = NULL; this->waitMillis = waitMillis; } virtual ~HelloWorldConsumer(){ cleanup(); } virtual void run() { try { string user,passwd,sID; user="default"; passwd=""; sID="lsgID"; // Create a ConnectionFactory ActiveMQConnectionFactory* connectionFactory = new ActiveMQConnectionFactory( "tcp://localhost:61613",user,passwd,sID); // Create a Connection connection = connectionFactory->createConnection();//user,passwd,sID); delete connectionFactory; connection->start(); connection->setExceptionListener(this); // Create a Session session = connection->createSession( Session::AUTO_ACKNOWLEDGE ); destination = session->createTopic( "mytopic" ); consumer = session->createDurableConsumer( destination , user , "",false); consumer->setMessageListener( this ); Thread::sleep( waitMillis ); } catch (CMSException& e) { e.printStackTrace(); } } virtual void onMessage( const Message* message ){ try { const TextMessage* textMessage = dynamic_cast< const TextMessage* >( message ); string text = textMessage->getText(); printf( "Received: %s\n", text.c_str() ); } catch (CMSException& e) { e.printStackTrace(); } } virtual void onException( const CMSException& ex ) { printf("JMS Exception occured. Shutting down client.\n"); } private: void cleanup(){ // Destroy resources. try{ if( destination != NULL ) delete destination; }catch (CMSException& e) {} destination = NULL; try{ if( consumer != NULL ) delete consumer; }catch (CMSException& e) {} consumer = NULL; // Close open resources. try{ if( session != NULL ) session->close(); if( connection != NULL ) connection->close(); }catch (CMSException& e) {} try{ if( session != NULL ) delete session; }catch (CMSException& e) {} session = NULL; try{ if( connection != NULL ) delete connection; }catch (CMSException& e) {} connection = NULL; } }; void Produce() { HelloWorldProducer producer( 2 ); Thread producerThread( &producer ); producerThread.start(); producerThread.join(); } void Consumer1() { HelloWorldConsumer consumer( 10000 ); Thread consumerThread( &consumer ); consumerThread.start(); consumerThread.join(); } void Consumer2() { HelloWorldConsumer consumer( 10000 ); Thread consumerThread( &consumer ); consumerThread.start(); consumerThread.join(); } int main(int argc, char* argv[]) { Produce(); Consumer1(); Consumer2(); }



Reply via email to