> > How do I achieve durable subscription base on my program? > What is it you are trying to do? What problem are you trying to solve?
> > > sgliu wrote: > > > > Thanks for you advice.I wil read your tell me content. > > > > > > > > > > tabish121 wrote: > >> > >>> > >>> > >>> follow program,why cann't receive data? > >>> How should I modify code? > >> > >> Looks like you produce before the durable consumer has been connected > >> once. > >> > >> See this link: > >> http://www.activemq.org/site/how-do-durable-queues-and-topics-work.html > >> > >> A durable consumer must have connected and then disconnected before > >> messages are persisted in anticipation of the consumer reconnecting. > >> That's why its important to use the same subscription name when > >> reconnecting, so that the broker know that the consumer that was > >> subscribed as durable is now back and it can deliver the messages that > >> were stored in its absence. > >> > >>> > >>> #include <activemq/concurrent/Thread.h> > >>> #include <activemq/concurrent/Runnable.h> > >>> #include <activemq/core/ActiveMQConnectionFactory.h> > >>> #include <activemq/util/Integer.h> > >>> #include <cms/Connection.h> > >>> #include <cms/Session.h> > >>> #include <cms/TextMessage.h> > >>> #include <cms/ExceptionListener.h> > >>> #include <cms/MessageListener.h> > >>> #include <stdlib.h> > >>> > >>> using namespace activemq::core; > >>> using namespace activemq::util; > >>> using namespace activemq::concurrent; > >>> using namespace cms; > >>> using namespace std; > >>> > >>> class HelloWorldProducer : public Runnable { > >>> private: > >>> > >>> Connection* connection; > >>> Session* session; > >>> Topic* destination; > >>> MessageProducer* producer; > >>> int numMessages; > >>> > >>> public: > >>> > >>> HelloWorldProducer( int numMessages ){ > >>> connection = NULL; > >>> session = NULL; > >>> destination = NULL; > >>> producer = NULL; > >>> this->numMessages = numMessages; > >>> } > >>> > >>> virtual ~HelloWorldProducer(){ > >>> cleanup(); > >>> } > >>> > >>> virtual void run() { > >>> try { > >>> string user,passwd,sID; > >>> user="default"; > >>> passwd=""; > >>> sID="lsgID"; > >>> ActiveMQConnectionFactory* connectionFactory = new > >>> ActiveMQConnectionFactory("tcp://localhost:61613",user,passwd,sID); > >>> > >>> connection = > >>> connectionFactory->createConnection(user,passwd,sID); > >>> connection->start(); > >>> > >>> string sss=connection->getClientId(); > >>> cout << sss << endl; > >>> > >>> session = connection->createSession( > >> Session::AUTO_ACKNOWLEDGE > >>> ); > >>> destination = session->createTopic( "mytopic" ); > >>> > >>> producer = session->createProducer( destination > >> ); > >>> producer->setDeliveryMode( DeliveryMode::PERSISTANT ); > >>> > >>> producer->setTimeToLive(100000000); > >>> string threadIdStr = Integer::toString( Thread::getId() ); > >>> > >>> // Create a messages > >>> string text = (string)"Hello world! from thread " + > >>> threadIdStr; > >>> > >>> for( int ix=0; ix<numMessages; ++ix ){ > >>> TextMessage* message = session->createTextMessage( > >> text > >>> ); > >>> > >>> string messageID="messageID"; > >>> message->setCMSExpiration(10000000000); > >>> message->setCMSMessageId(messageID); > >>> > >>> // Tell the producer to send the message > >>> printf( "Sent message from thread %s\n", > >>> threadIdStr.c_str() ); > >>> producer->send( message ); > >>> > >>> delete message; > >>> } > >>> > >>> }catch ( CMSException& e ) { > >>> e.printStackTrace(); > >>> } > >>> } > >>> > >>> private: > >>> > >>> void cleanup(){ > >>> > >>> // Destroy resources. > >>> try{ > >>> if( destination != NULL ) delete destination; > >>> }catch ( CMSException& e ) {} > >>> destination = NULL; > >>> > >>> try{ > >>> if( producer != NULL ) delete producer; > >>> }catch ( CMSException& e ) {} > >>> producer = NULL; > >>> > >>> // Close open resources. > >>> try{ > >>> if( session != NULL ) session->close(); > >>> if( connection != NULL ) connection->close(); > >>> }catch ( CMSException& e ) {} > >>> > >>> try{ > >>> if( session != NULL ) delete session; > >>> }catch ( CMSException& e ) {} > >>> session = NULL; > >>> > >>> try{ > >>> if( connection != NULL ) delete connection; > >>> }catch ( CMSException& e ) {} > >>> connection = NULL; > >>> } > >>> }; > >>> > >>> class HelloWorldConsumer : public ExceptionListener, > >>> public MessageListener, > >>> public Runnable { > >>> > >>> private: > >>> > >>> Connection* connection; > >>> Session* session; > >>> Topic* destination; > >>> MessageConsumer* consumer; > >>> long waitMillis; > >>> > >>> public: > >>> > >>> HelloWorldConsumer( long waitMillis ){ > >>> connection = NULL; > >>> session = NULL; > >>> destination = NULL; > >>> consumer = NULL; > >>> this->waitMillis = waitMillis; > >>> } > >>> virtual ~HelloWorldConsumer(){ > >>> cleanup(); > >>> } > >>> > >>> virtual void run() { > >>> > >>> try { > >>> > >>> string user,passwd,sID; > >>> user="default"; > >>> passwd=""; > >>> sID="lsgID"; > >>> // Create a ConnectionFactory > >>> ActiveMQConnectionFactory* connectionFactory = > >>> new ActiveMQConnectionFactory( > >>> "tcp://localhost:61613",user,passwd,sID); > >>> > >>> // Create a Connection > >>> connection = > >>> connectionFactory->createConnection();//user,passwd,sID); > >>> delete connectionFactory; > >>> connection->start(); > >>> > >>> connection->setExceptionListener(this); > >>> > >>> // Create a Session > >>> session = connection->createSession( > >> Session::AUTO_ACKNOWLEDGE > >>> ); > >>> destination = session->createTopic( "mytopic" ); > >>> consumer = session->createDurableConsumer( > >> destination , > >>> user , > >>> "",false); > >>> > >>> consumer->setMessageListener( this ); > >>> > >>> Thread::sleep( waitMillis ); > >>> > >>> } catch (CMSException& e) { > >>> e.printStackTrace(); > >>> } > >>> } > >>> > >>> virtual void onMessage( const Message* message ){ > >>> > >>> try > >>> { > >>> const TextMessage* textMessage = > >>> dynamic_cast< const TextMessage* >( message ); > >>> string text = textMessage->getText(); > >>> printf( "Received: %s\n", text.c_str() ); > >>> } catch (CMSException& e) { > >>> e.printStackTrace(); > >>> } > >>> } > >>> > >>> virtual void onException( const CMSException& ex ) { > >>> printf("JMS Exception occured. Shutting down client.\n"); > >>> } > >>> > >>> private: > >>> > >>> void cleanup(){ > >>> > >>> // Destroy resources. > >>> try{ > >>> if( destination != NULL ) delete destination; > >>> }catch (CMSException& e) {} > >>> destination = NULL; > >>> > >>> try{ > >>> if( consumer != NULL ) delete consumer; > >>> }catch (CMSException& e) {} > >>> consumer = NULL; > >>> > >>> // Close open resources. > >>> try{ > >>> if( session != NULL ) session->close(); > >>> if( connection != NULL ) connection->close(); > >>> }catch (CMSException& e) {} > >>> > >>> try{ > >>> if( session != NULL ) delete session; > >>> }catch (CMSException& e) {} > >>> session = NULL; > >>> > >>> try{ > >>> if( connection != NULL ) delete connection; > >>> }catch (CMSException& e) {} > >>> connection = NULL; > >>> } > >>> }; > >>> void Produce() > >>> { > >>> HelloWorldProducer producer( 2 ); > >>> Thread producerThread( &producer ); > >>> producerThread.start(); > >>> producerThread.join(); > >>> } > >>> void Consumer() > >>> { > >>> HelloWorldConsumer consumer( 10000 ); > >>> Thread consumerThread( &consumer ); > >>> consumerThread.start(); > >>> consumerThread.join(); > >>> } > >>> int main(int argc, char* argv[]) > >>> { > >>> Produce(); > >>> Consumer(); > >>> } > >>> -- > >>> View this message in context: http://www.nabble.com/above-topic- > >>> tf2641566.html#a7373622 > >>> Sent from the ActiveMQ - User mailing list archive at Nabble.com. > >> > >> > >> > > > > > > -- > View this message in context: http://www.nabble.com/above-topic- > tf2641566.html#a7393074 > Sent from the ActiveMQ - User mailing list archive at Nabble.com.