How do I achieve durable subscription base on my program?


sgliu wrote:
> 
> Thanks for you advice.I wil read your tell me content.
> 
> 
> 
> 
> tabish121 wrote:
>> 
>>> 
>>> 
>>> follow program,why cann't receive data?
>>> How should I modify code?
>> 
>> Looks like you produce before the durable consumer has been connected
>> once. 
>> 
>> See this link:
>> http://www.activemq.org/site/how-do-durable-queues-and-topics-work.html
>> 
>> A durable consumer must have connected and then disconnected before
>> messages are persisted in anticipation of the consumer reconnecting.
>> That's why its important to use the same subscription name when
>> reconnecting, so that the broker know that the consumer that was
>> subscribed as durable is now back and it can deliver the messages that
>> were stored in its absence.  
>> 
>>> 
>>> #include <activemq/concurrent/Thread.h>
>>> #include <activemq/concurrent/Runnable.h>
>>> #include <activemq/core/ActiveMQConnectionFactory.h>
>>> #include <activemq/util/Integer.h>
>>> #include <cms/Connection.h>
>>> #include <cms/Session.h>
>>> #include <cms/TextMessage.h>
>>> #include <cms/ExceptionListener.h>
>>> #include <cms/MessageListener.h>
>>> #include <stdlib.h>
>>> 
>>> using namespace activemq::core;
>>> using namespace activemq::util;
>>> using namespace activemq::concurrent;
>>> using namespace cms;
>>> using namespace std;
>>> 
>>> class HelloWorldProducer : public Runnable {
>>> private:
>>> 
>>>     Connection* connection;
>>>     Session* session;
>>>     Topic* destination;
>>>     MessageProducer* producer;
>>>     int numMessages;
>>> 
>>> public:
>>> 
>>>     HelloWorldProducer( int numMessages ){
>>>             connection = NULL;
>>>             session = NULL;
>>>             destination = NULL;
>>>             producer = NULL;
>>>             this->numMessages = numMessages;
>>>     }
>>> 
>>>     virtual ~HelloWorldProducer(){
>>>             cleanup();
>>>     }
>>> 
>>>     virtual void run() {
>>>         try {
>>>                     string user,passwd,sID;
>>>                     user="default";
>>>                     passwd="";
>>>                     sID="lsgID";
>>>             ActiveMQConnectionFactory* connectionFactory = new
>>> ActiveMQConnectionFactory("tcp://localhost:61613",user,passwd,sID);
>>> 
>>>             connection =
>>> connectionFactory->createConnection(user,passwd,sID);
>>>             connection->start();
>>> 
>>>                     string sss=connection->getClientId();
>>>                     cout << sss << endl;
>>> 
>>>             session = connection->createSession(
>> Session::AUTO_ACKNOWLEDGE
>>> );
>>>                     destination = session->createTopic( "mytopic" );
>>> 
>>>                     producer = session->createProducer( destination
>> );
>>>             producer->setDeliveryMode( DeliveryMode::PERSISTANT );
>>> 
>>>                     producer->setTimeToLive(100000000);
>>>             string threadIdStr = Integer::toString( Thread::getId() );
>>> 
>>>             // Create a messages
>>>             string text = (string)"Hello world! from thread " +
>>> threadIdStr;
>>> 
>>>             for( int ix=0; ix<numMessages; ++ix ){
>>>                 TextMessage* message = session->createTextMessage(
>> text
>>> );
>>> 
>>>                             string messageID="messageID";
>>>                             message->setCMSExpiration(10000000000);
>>>                             message->setCMSMessageId(messageID);
>>> 
>>>                     // Tell the producer to send the message
>>>                 printf( "Sent message from thread %s\n",
>>> threadIdStr.c_str() );
>>>                     producer->send( message );
>>> 
>>>                     delete message;
>>>             }
>>> 
>>>         }catch ( CMSException& e ) {
>>>             e.printStackTrace();
>>>         }
>>>     }
>>> 
>>> private:
>>> 
>>>     void cleanup(){
>>> 
>>>                     // Destroy resources.
>>>                     try{
>>>                     if( destination != NULL ) delete destination;
>>>                     }catch ( CMSException& e ) {}
>>>                     destination = NULL;
>>> 
>>>                     try{
>>>                 if( producer != NULL ) delete producer;
>>>                     }catch ( CMSException& e ) {}
>>>                     producer = NULL;
>>> 
>>>                     // Close open resources.
>>>                     try{
>>>                             if( session != NULL ) session->close();
>>>                             if( connection != NULL ) connection->close();
>>>                     }catch ( CMSException& e ) {}
>>> 
>>>                     try{
>>>                     if( session != NULL ) delete session;
>>>                     }catch ( CMSException& e ) {}
>>>                     session = NULL;
>>> 
>>>             try{
>>>                     if( connection != NULL ) delete connection;
>>>                     }catch ( CMSException& e ) {}
>>>                     connection = NULL;
>>>     }
>>> };
>>> 
>>> class HelloWorldConsumer : public ExceptionListener,
>>>                            public MessageListener,
>>>                            public Runnable {
>>> 
>>> private:
>>> 
>>>     Connection* connection;
>>>     Session* session;
>>>     Topic* destination;
>>>     MessageConsumer* consumer;
>>>     long waitMillis;
>>> 
>>> public:
>>> 
>>>     HelloWorldConsumer( long waitMillis ){
>>>             connection = NULL;
>>>             session = NULL;
>>>             destination = NULL;
>>>             consumer = NULL;
>>>             this->waitMillis = waitMillis;
>>>     }
>>>     virtual ~HelloWorldConsumer(){
>>>             cleanup();
>>>     }
>>> 
>>>     virtual void run() {
>>> 
>>>         try {
>>> 
>>>                     string user,passwd,sID;
>>>                     user="default";
>>>                     passwd="";
>>>                     sID="lsgID";
>>>             // Create a ConnectionFactory
>>>             ActiveMQConnectionFactory* connectionFactory =
>>>                 new ActiveMQConnectionFactory(
>>> "tcp://localhost:61613",user,passwd,sID);
>>> 
>>>             // Create a Connection
>>>             connection =
>>> connectionFactory->createConnection();//user,passwd,sID);
>>>             delete connectionFactory;
>>>             connection->start();
>>> 
>>>             connection->setExceptionListener(this);
>>> 
>>>             // Create a Session
>>>             session = connection->createSession(
>> Session::AUTO_ACKNOWLEDGE
>>> );
>>>                     destination = session->createTopic( "mytopic" );
>>>                     consumer = session->createDurableConsumer(
>> destination ,
>>> user ,
>>> "",false);
>>> 
>>>             consumer->setMessageListener( this );
>>> 
>>>             Thread::sleep( waitMillis );
>>> 
>>>         } catch (CMSException& e) {
>>>             e.printStackTrace();
>>>         }
>>>     }
>>> 
>>>     virtual void onMessage( const Message* message ){
>>> 
>>>         try
>>>         {
>>>                 const TextMessage* textMessage =
>>>                 dynamic_cast< const TextMessage* >( message );
>>>             string text = textMessage->getText();
>>>             printf( "Received: %s\n", text.c_str() );
>>>         } catch (CMSException& e) {
>>>             e.printStackTrace();
>>>         }
>>>     }
>>> 
>>>     virtual void onException( const CMSException& ex ) {
>>>         printf("JMS Exception occured.  Shutting down client.\n");
>>>     }
>>> 
>>> private:
>>> 
>>>     void cleanup(){
>>> 
>>>             // Destroy resources.
>>>             try{
>>>             if( destination != NULL ) delete destination;
>>>             }catch (CMSException& e) {}
>>>             destination = NULL;
>>> 
>>>             try{
>>>             if( consumer != NULL ) delete consumer;
>>>             }catch (CMSException& e) {}
>>>             consumer = NULL;
>>> 
>>>             // Close open resources.
>>>             try{
>>>                     if( session != NULL ) session->close();
>>>                     if( connection != NULL ) connection->close();
>>>             }catch (CMSException& e) {}
>>> 
>>>         try{
>>>             if( session != NULL ) delete session;
>>>             }catch (CMSException& e) {}
>>>             session = NULL;
>>> 
>>>             try{
>>>             if( connection != NULL ) delete connection;
>>>             }catch (CMSException& e) {}
>>>             connection = NULL;
>>>     }
>>> };
>>>   void Produce()
>>>  {
>>>      HelloWorldProducer producer( 2 );
>>>      Thread producerThread( &producer );
>>>      producerThread.start();
>>>      producerThread.join();
>>>  }
>>>  void Consumer()
>>>  {
>>>       HelloWorldConsumer consumer( 10000 );
>>>       Thread consumerThread( &consumer );
>>>       consumerThread.start();
>>>       consumerThread.join();
>>>  }
>>>  int main(int argc, char* argv[])
>>>  {
>>>       Produce();
>>>       Consumer();
>>>  }
>>> --
>>> View this message in context: http://www.nabble.com/above-topic-
>>> tf2641566.html#a7373622
>>> Sent from the ActiveMQ - User mailing list archive at Nabble.com.
>> 
>> 
>> 
> 
> 

-- 
View this message in context: 
http://www.nabble.com/above-topic-tf2641566.html#a7393074
Sent from the ActiveMQ - User mailing list archive at Nabble.com.

Reply via email to