I wish I producer message A, and then I exit the producer program. Then I start two consumer program(one is C1,the other is C2) at same time.C1 can receive A , C2 can receive A.
tabish121 wrote: > >> >> How do I achieve durable subscription base on my program? >> > > What is it you are trying to do? What problem are you trying to solve? > > >> >> >> sgliu wrote: >> > >> > Thanks for you advice.I wil read your tell me content. >> > >> > >> > >> > >> > tabish121 wrote: >> >> >> >>> >> >>> >> >>> follow program,why cann't receive data? >> >>> How should I modify code? >> >> >> >> Looks like you produce before the durable consumer has been > connected >> >> once. >> >> >> >> See this link: >> >> > http://www.activemq.org/site/how-do-durable-queues-and-topics-work.html >> >> >> >> A durable consumer must have connected and then disconnected before >> >> messages are persisted in anticipation of the consumer > reconnecting. >> >> That's why its important to use the same subscription name when >> >> reconnecting, so that the broker know that the consumer that was >> >> subscribed as durable is now back and it can deliver the messages > that >> >> were stored in its absence. >> >> >> >>> >> >>> #include <activemq/concurrent/Thread.h> >> >>> #include <activemq/concurrent/Runnable.h> >> >>> #include <activemq/core/ActiveMQConnectionFactory.h> >> >>> #include <activemq/util/Integer.h> >> >>> #include <cms/Connection.h> >> >>> #include <cms/Session.h> >> >>> #include <cms/TextMessage.h> >> >>> #include <cms/ExceptionListener.h> >> >>> #include <cms/MessageListener.h> >> >>> #include <stdlib.h> >> >>> >> >>> using namespace activemq::core; >> >>> using namespace activemq::util; >> >>> using namespace activemq::concurrent; >> >>> using namespace cms; >> >>> using namespace std; >> >>> >> >>> class HelloWorldProducer : public Runnable { >> >>> private: >> >>> >> >>> Connection* connection; >> >>> Session* session; >> >>> Topic* destination; >> >>> MessageProducer* producer; >> >>> int numMessages; >> >>> >> >>> public: >> >>> >> >>> HelloWorldProducer( int numMessages ){ >> >>> connection = NULL; >> >>> session = NULL; >> >>> destination = NULL; >> >>> producer = NULL; >> >>> this->numMessages = numMessages; >> >>> } >> >>> >> >>> virtual ~HelloWorldProducer(){ >> >>> cleanup(); >> >>> } >> >>> >> >>> virtual void run() { >> >>> try { >> >>> string user,passwd,sID; >> >>> user="default"; >> >>> passwd=""; >> >>> sID="lsgID"; >> >>> ActiveMQConnectionFactory* connectionFactory = new >> >>> > ActiveMQConnectionFactory("tcp://localhost:61613",user,passwd,sID); >> >>> >> >>> connection = >> >>> connectionFactory->createConnection(user,passwd,sID); >> >>> connection->start(); >> >>> >> >>> string sss=connection->getClientId(); >> >>> cout << sss << endl; >> >>> >> >>> session = connection->createSession( >> >> Session::AUTO_ACKNOWLEDGE >> >>> ); >> >>> destination = session->createTopic( "mytopic" ); >> >>> >> >>> producer = session->createProducer( destination >> >> ); >> >>> producer->setDeliveryMode( DeliveryMode::PERSISTANT ); >> >>> >> >>> producer->setTimeToLive(100000000); >> >>> string threadIdStr = Integer::toString( > Thread::getId() ); >> >>> >> >>> // Create a messages >> >>> string text = (string)"Hello world! from thread " + >> >>> threadIdStr; >> >>> >> >>> for( int ix=0; ix<numMessages; ++ix ){ >> >>> TextMessage* message = session->createTextMessage( >> >> text >> >>> ); >> >>> >> >>> string messageID="messageID"; >> >>> message->setCMSExpiration(10000000000); >> >>> message->setCMSMessageId(messageID); >> >>> >> >>> // Tell the producer to send the message >> >>> printf( "Sent message from thread %s\n", >> >>> threadIdStr.c_str() ); >> >>> producer->send( message ); >> >>> >> >>> delete message; >> >>> } >> >>> >> >>> }catch ( CMSException& e ) { >> >>> e.printStackTrace(); >> >>> } >> >>> } >> >>> >> >>> private: >> >>> >> >>> void cleanup(){ >> >>> >> >>> // Destroy resources. >> >>> try{ >> >>> if( destination != NULL ) delete destination; >> >>> }catch ( CMSException& e ) {} >> >>> destination = NULL; >> >>> >> >>> try{ >> >>> if( producer != NULL ) delete producer; >> >>> }catch ( CMSException& e ) {} >> >>> producer = NULL; >> >>> >> >>> // Close open resources. >> >>> try{ >> >>> if( session != NULL ) session->close(); >> >>> if( connection != NULL ) > connection->close(); >> >>> }catch ( CMSException& e ) {} >> >>> >> >>> try{ >> >>> if( session != NULL ) delete session; >> >>> }catch ( CMSException& e ) {} >> >>> session = NULL; >> >>> >> >>> try{ >> >>> if( connection != NULL ) delete connection; >> >>> }catch ( CMSException& e ) {} >> >>> connection = NULL; >> >>> } >> >>> }; >> >>> >> >>> class HelloWorldConsumer : public ExceptionListener, >> >>> public MessageListener, >> >>> public Runnable { >> >>> >> >>> private: >> >>> >> >>> Connection* connection; >> >>> Session* session; >> >>> Topic* destination; >> >>> MessageConsumer* consumer; >> >>> long waitMillis; >> >>> >> >>> public: >> >>> >> >>> HelloWorldConsumer( long waitMillis ){ >> >>> connection = NULL; >> >>> session = NULL; >> >>> destination = NULL; >> >>> consumer = NULL; >> >>> this->waitMillis = waitMillis; >> >>> } >> >>> virtual ~HelloWorldConsumer(){ >> >>> cleanup(); >> >>> } >> >>> >> >>> virtual void run() { >> >>> >> >>> try { >> >>> >> >>> string user,passwd,sID; >> >>> user="default"; >> >>> passwd=""; >> >>> sID="lsgID"; >> >>> // Create a ConnectionFactory >> >>> ActiveMQConnectionFactory* connectionFactory = >> >>> new ActiveMQConnectionFactory( >> >>> "tcp://localhost:61613",user,passwd,sID); >> >>> >> >>> // Create a Connection >> >>> connection = >> >>> connectionFactory->createConnection();//user,passwd,sID); >> >>> delete connectionFactory; >> >>> connection->start(); >> >>> >> >>> connection->setExceptionListener(this); >> >>> >> >>> // Create a Session >> >>> session = connection->createSession( >> >> Session::AUTO_ACKNOWLEDGE >> >>> ); >> >>> destination = session->createTopic( "mytopic" ); >> >>> consumer = session->createDurableConsumer( >> >> destination , >> >>> user , >> >>> "",false); >> >>> >> >>> consumer->setMessageListener( this ); >> >>> >> >>> Thread::sleep( waitMillis ); >> >>> >> >>> } catch (CMSException& e) { >> >>> e.printStackTrace(); >> >>> } >> >>> } >> >>> >> >>> virtual void onMessage( const Message* message ){ >> >>> >> >>> try >> >>> { >> >>> const TextMessage* textMessage = >> >>> dynamic_cast< const TextMessage* >( message ); >> >>> string text = textMessage->getText(); >> >>> printf( "Received: %s\n", text.c_str() ); >> >>> } catch (CMSException& e) { >> >>> e.printStackTrace(); >> >>> } >> >>> } >> >>> >> >>> virtual void onException( const CMSException& ex ) { >> >>> printf("JMS Exception occured. Shutting down client.\n"); >> >>> } >> >>> >> >>> private: >> >>> >> >>> void cleanup(){ >> >>> >> >>> // Destroy resources. >> >>> try{ >> >>> if( destination != NULL ) delete destination; >> >>> }catch (CMSException& e) {} >> >>> destination = NULL; >> >>> >> >>> try{ >> >>> if( consumer != NULL ) delete consumer; >> >>> }catch (CMSException& e) {} >> >>> consumer = NULL; >> >>> >> >>> // Close open resources. >> >>> try{ >> >>> if( session != NULL ) session->close(); >> >>> if( connection != NULL ) connection->close(); >> >>> }catch (CMSException& e) {} >> >>> >> >>> try{ >> >>> if( session != NULL ) delete session; >> >>> }catch (CMSException& e) {} >> >>> session = NULL; >> >>> >> >>> try{ >> >>> if( connection != NULL ) delete connection; >> >>> }catch (CMSException& e) {} >> >>> connection = NULL; >> >>> } >> >>> }; >> >>> void Produce() >> >>> { >> >>> HelloWorldProducer producer( 2 ); >> >>> Thread producerThread( &producer ); >> >>> producerThread.start(); >> >>> producerThread.join(); >> >>> } >> >>> void Consumer() >> >>> { >> >>> HelloWorldConsumer consumer( 10000 ); >> >>> Thread consumerThread( &consumer ); >> >>> consumerThread.start(); >> >>> consumerThread.join(); >> >>> } >> >>> int main(int argc, char* argv[]) >> >>> { >> >>> Produce(); >> >>> Consumer(); >> >>> } >> >>> -- >> >>> View this message in context: http://www.nabble.com/above-topic- >> >>> tf2641566.html#a7373622 >> >>> Sent from the ActiveMQ - User mailing list archive at Nabble.com. >> >> >> >> >> >> >> > >> > >> >> -- >> View this message in context: http://www.nabble.com/above-topic- >> tf2641566.html#a7393074 >> Sent from the ActiveMQ - User mailing list archive at Nabble.com. > > > -- View this message in context: http://www.nabble.com/above-topic-tf2641566.html#a7413840 Sent from the ActiveMQ - User mailing list archive at Nabble.com.