How do I achieve durable subscription base on my program?
sgliu wrote: > > Thanks for you advice.I wil read your tell me content. > > > > > tabish121 wrote: >> >>> >>> >>> follow program,why cann't receive data? >>> How should I modify code? >> >> Looks like you produce before the durable consumer has been connected >> once. >> >> See this link: >> http://www.activemq.org/site/how-do-durable-queues-and-topics-work.html >> >> A durable consumer must have connected and then disconnected before >> messages are persisted in anticipation of the consumer reconnecting. >> That's why its important to use the same subscription name when >> reconnecting, so that the broker know that the consumer that was >> subscribed as durable is now back and it can deliver the messages that >> were stored in its absence. >> >>> >>> #include <activemq/concurrent/Thread.h> >>> #include <activemq/concurrent/Runnable.h> >>> #include <activemq/core/ActiveMQConnectionFactory.h> >>> #include <activemq/util/Integer.h> >>> #include <cms/Connection.h> >>> #include <cms/Session.h> >>> #include <cms/TextMessage.h> >>> #include <cms/ExceptionListener.h> >>> #include <cms/MessageListener.h> >>> #include <stdlib.h> >>> >>> using namespace activemq::core; >>> using namespace activemq::util; >>> using namespace activemq::concurrent; >>> using namespace cms; >>> using namespace std; >>> >>> class HelloWorldProducer : public Runnable { >>> private: >>> >>> Connection* connection; >>> Session* session; >>> Topic* destination; >>> MessageProducer* producer; >>> int numMessages; >>> >>> public: >>> >>> HelloWorldProducer( int numMessages ){ >>> connection = NULL; >>> session = NULL; >>> destination = NULL; >>> producer = NULL; >>> this->numMessages = numMessages; >>> } >>> >>> virtual ~HelloWorldProducer(){ >>> cleanup(); >>> } >>> >>> virtual void run() { >>> try { >>> string user,passwd,sID; >>> user="default"; >>> passwd=""; >>> sID="lsgID"; >>> ActiveMQConnectionFactory* connectionFactory = new >>> ActiveMQConnectionFactory("tcp://localhost:61613",user,passwd,sID); >>> >>> connection = >>> connectionFactory->createConnection(user,passwd,sID); >>> connection->start(); >>> >>> string sss=connection->getClientId(); >>> cout << sss << endl; >>> >>> session = connection->createSession( >> Session::AUTO_ACKNOWLEDGE >>> ); >>> destination = session->createTopic( "mytopic" ); >>> >>> producer = session->createProducer( destination >> ); >>> producer->setDeliveryMode( DeliveryMode::PERSISTANT ); >>> >>> producer->setTimeToLive(100000000); >>> string threadIdStr = Integer::toString( Thread::getId() ); >>> >>> // Create a messages >>> string text = (string)"Hello world! from thread " + >>> threadIdStr; >>> >>> for( int ix=0; ix<numMessages; ++ix ){ >>> TextMessage* message = session->createTextMessage( >> text >>> ); >>> >>> string messageID="messageID"; >>> message->setCMSExpiration(10000000000); >>> message->setCMSMessageId(messageID); >>> >>> // Tell the producer to send the message >>> printf( "Sent message from thread %s\n", >>> threadIdStr.c_str() ); >>> producer->send( message ); >>> >>> delete message; >>> } >>> >>> }catch ( CMSException& e ) { >>> e.printStackTrace(); >>> } >>> } >>> >>> private: >>> >>> void cleanup(){ >>> >>> // Destroy resources. >>> try{ >>> if( destination != NULL ) delete destination; >>> }catch ( CMSException& e ) {} >>> destination = NULL; >>> >>> try{ >>> if( producer != NULL ) delete producer; >>> }catch ( CMSException& e ) {} >>> producer = NULL; >>> >>> // Close open resources. >>> try{ >>> if( session != NULL ) session->close(); >>> if( connection != NULL ) connection->close(); >>> }catch ( CMSException& e ) {} >>> >>> try{ >>> if( session != NULL ) delete session; >>> }catch ( CMSException& e ) {} >>> session = NULL; >>> >>> try{ >>> if( connection != NULL ) delete connection; >>> }catch ( CMSException& e ) {} >>> connection = NULL; >>> } >>> }; >>> >>> class HelloWorldConsumer : public ExceptionListener, >>> public MessageListener, >>> public Runnable { >>> >>> private: >>> >>> Connection* connection; >>> Session* session; >>> Topic* destination; >>> MessageConsumer* consumer; >>> long waitMillis; >>> >>> public: >>> >>> HelloWorldConsumer( long waitMillis ){ >>> connection = NULL; >>> session = NULL; >>> destination = NULL; >>> consumer = NULL; >>> this->waitMillis = waitMillis; >>> } >>> virtual ~HelloWorldConsumer(){ >>> cleanup(); >>> } >>> >>> virtual void run() { >>> >>> try { >>> >>> string user,passwd,sID; >>> user="default"; >>> passwd=""; >>> sID="lsgID"; >>> // Create a ConnectionFactory >>> ActiveMQConnectionFactory* connectionFactory = >>> new ActiveMQConnectionFactory( >>> "tcp://localhost:61613",user,passwd,sID); >>> >>> // Create a Connection >>> connection = >>> connectionFactory->createConnection();//user,passwd,sID); >>> delete connectionFactory; >>> connection->start(); >>> >>> connection->setExceptionListener(this); >>> >>> // Create a Session >>> session = connection->createSession( >> Session::AUTO_ACKNOWLEDGE >>> ); >>> destination = session->createTopic( "mytopic" ); >>> consumer = session->createDurableConsumer( >> destination , >>> user , >>> "",false); >>> >>> consumer->setMessageListener( this ); >>> >>> Thread::sleep( waitMillis ); >>> >>> } catch (CMSException& e) { >>> e.printStackTrace(); >>> } >>> } >>> >>> virtual void onMessage( const Message* message ){ >>> >>> try >>> { >>> const TextMessage* textMessage = >>> dynamic_cast< const TextMessage* >( message ); >>> string text = textMessage->getText(); >>> printf( "Received: %s\n", text.c_str() ); >>> } catch (CMSException& e) { >>> e.printStackTrace(); >>> } >>> } >>> >>> virtual void onException( const CMSException& ex ) { >>> printf("JMS Exception occured. Shutting down client.\n"); >>> } >>> >>> private: >>> >>> void cleanup(){ >>> >>> // Destroy resources. >>> try{ >>> if( destination != NULL ) delete destination; >>> }catch (CMSException& e) {} >>> destination = NULL; >>> >>> try{ >>> if( consumer != NULL ) delete consumer; >>> }catch (CMSException& e) {} >>> consumer = NULL; >>> >>> // Close open resources. >>> try{ >>> if( session != NULL ) session->close(); >>> if( connection != NULL ) connection->close(); >>> }catch (CMSException& e) {} >>> >>> try{ >>> if( session != NULL ) delete session; >>> }catch (CMSException& e) {} >>> session = NULL; >>> >>> try{ >>> if( connection != NULL ) delete connection; >>> }catch (CMSException& e) {} >>> connection = NULL; >>> } >>> }; >>> void Produce() >>> { >>> HelloWorldProducer producer( 2 ); >>> Thread producerThread( &producer ); >>> producerThread.start(); >>> producerThread.join(); >>> } >>> void Consumer() >>> { >>> HelloWorldConsumer consumer( 10000 ); >>> Thread consumerThread( &consumer ); >>> consumerThread.start(); >>> consumerThread.join(); >>> } >>> int main(int argc, char* argv[]) >>> { >>> Produce(); >>> Consumer(); >>> } >>> -- >>> View this message in context: http://www.nabble.com/above-topic- >>> tf2641566.html#a7373622 >>> Sent from the ActiveMQ - User mailing list archive at Nabble.com. >> >> >> > > -- View this message in context: http://www.nabble.com/above-topic-tf2641566.html#a7393074 Sent from the ActiveMQ - User mailing list archive at Nabble.com.