I wish I producer  message A, and then I exit the producer program. Then I
start two consumer program(one is C1,the other is C2) at same time.C1  can
receive A , C2 can receive A.



tabish121 wrote:
> 
>> 
>> How do I achieve durable subscription base on my program?
>> 
> 
> What is it you are trying to do?  What problem are you trying to solve?
> 
> 
>> 
>> 
>> sgliu wrote:
>> >
>> > Thanks for you advice.I wil read your tell me content.
>> >
>> >
>> >
>> >
>> > tabish121 wrote:
>> >>
>> >>>
>> >>>
>> >>> follow program,why cann't receive data?
>> >>> How should I modify code?
>> >>
>> >> Looks like you produce before the durable consumer has been
> connected
>> >> once.
>> >>
>> >> See this link:
>> >>
> http://www.activemq.org/site/how-do-durable-queues-and-topics-work.html
>> >>
>> >> A durable consumer must have connected and then disconnected before
>> >> messages are persisted in anticipation of the consumer
> reconnecting.
>> >> That's why its important to use the same subscription name when
>> >> reconnecting, so that the broker know that the consumer that was
>> >> subscribed as durable is now back and it can deliver the messages
> that
>> >> were stored in its absence.
>> >>
>> >>>
>> >>> #include <activemq/concurrent/Thread.h>
>> >>> #include <activemq/concurrent/Runnable.h>
>> >>> #include <activemq/core/ActiveMQConnectionFactory.h>
>> >>> #include <activemq/util/Integer.h>
>> >>> #include <cms/Connection.h>
>> >>> #include <cms/Session.h>
>> >>> #include <cms/TextMessage.h>
>> >>> #include <cms/ExceptionListener.h>
>> >>> #include <cms/MessageListener.h>
>> >>> #include <stdlib.h>
>> >>>
>> >>> using namespace activemq::core;
>> >>> using namespace activemq::util;
>> >>> using namespace activemq::concurrent;
>> >>> using namespace cms;
>> >>> using namespace std;
>> >>>
>> >>> class HelloWorldProducer : public Runnable {
>> >>> private:
>> >>>
>> >>>  Connection* connection;
>> >>>  Session* session;
>> >>>  Topic* destination;
>> >>>  MessageProducer* producer;
>> >>>  int numMessages;
>> >>>
>> >>> public:
>> >>>
>> >>>  HelloWorldProducer( int numMessages ){
>> >>>          connection = NULL;
>> >>>          session = NULL;
>> >>>          destination = NULL;
>> >>>          producer = NULL;
>> >>>          this->numMessages = numMessages;
>> >>>  }
>> >>>
>> >>>  virtual ~HelloWorldProducer(){
>> >>>          cleanup();
>> >>>  }
>> >>>
>> >>>     virtual void run() {
>> >>>         try {
>> >>>                  string user,passwd,sID;
>> >>>                  user="default";
>> >>>                  passwd="";
>> >>>                  sID="lsgID";
>> >>>             ActiveMQConnectionFactory* connectionFactory = new
>> >>>
> ActiveMQConnectionFactory("tcp://localhost:61613",user,passwd,sID);
>> >>>
>> >>>             connection =
>> >>> connectionFactory->createConnection(user,passwd,sID);
>> >>>             connection->start();
>> >>>
>> >>>                  string sss=connection->getClientId();
>> >>>                  cout << sss << endl;
>> >>>
>> >>>             session = connection->createSession(
>> >> Session::AUTO_ACKNOWLEDGE
>> >>> );
>> >>>                  destination = session->createTopic( "mytopic" );
>> >>>
>> >>>                  producer = session->createProducer( destination
>> >> );
>> >>>             producer->setDeliveryMode( DeliveryMode::PERSISTANT );
>> >>>
>> >>>                  producer->setTimeToLive(100000000);
>> >>>             string threadIdStr = Integer::toString(
> Thread::getId() );
>> >>>
>> >>>             // Create a messages
>> >>>             string text = (string)"Hello world! from thread " +
>> >>> threadIdStr;
>> >>>
>> >>>             for( int ix=0; ix<numMessages; ++ix ){
>> >>>              TextMessage* message = session->createTextMessage(
>> >> text
>> >>> );
>> >>>
>> >>>                          string messageID="messageID";
>> >>>                          message->setCMSExpiration(10000000000);
>> >>>                          message->setCMSMessageId(messageID);
>> >>>
>> >>>                  // Tell the producer to send the message
>> >>>              printf( "Sent message from thread %s\n",
>> >>> threadIdStr.c_str() );
>> >>>                  producer->send( message );
>> >>>
>> >>>                  delete message;
>> >>>             }
>> >>>
>> >>>         }catch ( CMSException& e ) {
>> >>>             e.printStackTrace();
>> >>>         }
>> >>>     }
>> >>>
>> >>> private:
>> >>>
>> >>>     void cleanup(){
>> >>>
>> >>>                  // Destroy resources.
>> >>>                  try{
>> >>>                  if( destination != NULL ) delete destination;
>> >>>                  }catch ( CMSException& e ) {}
>> >>>                  destination = NULL;
>> >>>
>> >>>                  try{
>> >>>              if( producer != NULL ) delete producer;
>> >>>                  }catch ( CMSException& e ) {}
>> >>>                  producer = NULL;
>> >>>
>> >>>                  // Close open resources.
>> >>>                  try{
>> >>>                          if( session != NULL ) session->close();
>> >>>                          if( connection != NULL )
> connection->close();
>> >>>                  }catch ( CMSException& e ) {}
>> >>>
>> >>>                  try{
>> >>>                  if( session != NULL ) delete session;
>> >>>                  }catch ( CMSException& e ) {}
>> >>>                  session = NULL;
>> >>>
>> >>>             try{
>> >>>                  if( connection != NULL ) delete connection;
>> >>>                  }catch ( CMSException& e ) {}
>> >>>                  connection = NULL;
>> >>>     }
>> >>> };
>> >>>
>> >>> class HelloWorldConsumer : public ExceptionListener,
>> >>>                            public MessageListener,
>> >>>                            public Runnable {
>> >>>
>> >>> private:
>> >>>
>> >>>  Connection* connection;
>> >>>  Session* session;
>> >>>  Topic* destination;
>> >>>  MessageConsumer* consumer;
>> >>>  long waitMillis;
>> >>>
>> >>> public:
>> >>>
>> >>>  HelloWorldConsumer( long waitMillis ){
>> >>>          connection = NULL;
>> >>>          session = NULL;
>> >>>          destination = NULL;
>> >>>          consumer = NULL;
>> >>>          this->waitMillis = waitMillis;
>> >>>  }
>> >>>     virtual ~HelloWorldConsumer(){
>> >>>          cleanup();
>> >>>     }
>> >>>
>> >>>     virtual void run() {
>> >>>
>> >>>         try {
>> >>>
>> >>>                  string user,passwd,sID;
>> >>>                  user="default";
>> >>>                  passwd="";
>> >>>                  sID="lsgID";
>> >>>             // Create a ConnectionFactory
>> >>>             ActiveMQConnectionFactory* connectionFactory =
>> >>>                 new ActiveMQConnectionFactory(
>> >>> "tcp://localhost:61613",user,passwd,sID);
>> >>>
>> >>>             // Create a Connection
>> >>>             connection =
>> >>> connectionFactory->createConnection();//user,passwd,sID);
>> >>>             delete connectionFactory;
>> >>>             connection->start();
>> >>>
>> >>>             connection->setExceptionListener(this);
>> >>>
>> >>>             // Create a Session
>> >>>             session = connection->createSession(
>> >> Session::AUTO_ACKNOWLEDGE
>> >>> );
>> >>>                  destination = session->createTopic( "mytopic" );
>> >>>                  consumer = session->createDurableConsumer(
>> >> destination ,
>> >>> user ,
>> >>> "",false);
>> >>>
>> >>>             consumer->setMessageListener( this );
>> >>>
>> >>>             Thread::sleep( waitMillis );
>> >>>
>> >>>         } catch (CMSException& e) {
>> >>>             e.printStackTrace();
>> >>>         }
>> >>>     }
>> >>>
>> >>>     virtual void onMessage( const Message* message ){
>> >>>
>> >>>         try
>> >>>         {
>> >>>              const TextMessage* textMessage =
>> >>>                 dynamic_cast< const TextMessage* >( message );
>> >>>             string text = textMessage->getText();
>> >>>             printf( "Received: %s\n", text.c_str() );
>> >>>         } catch (CMSException& e) {
>> >>>             e.printStackTrace();
>> >>>         }
>> >>>     }
>> >>>
>> >>>     virtual void onException( const CMSException& ex ) {
>> >>>         printf("JMS Exception occured.  Shutting down client.\n");
>> >>>     }
>> >>>
>> >>> private:
>> >>>
>> >>>     void cleanup(){
>> >>>
>> >>>          // Destroy resources.
>> >>>          try{
>> >>>          if( destination != NULL ) delete destination;
>> >>>          }catch (CMSException& e) {}
>> >>>          destination = NULL;
>> >>>
>> >>>          try{
>> >>>             if( consumer != NULL ) delete consumer;
>> >>>          }catch (CMSException& e) {}
>> >>>          consumer = NULL;
>> >>>
>> >>>          // Close open resources.
>> >>>          try{
>> >>>                  if( session != NULL ) session->close();
>> >>>                  if( connection != NULL ) connection->close();
>> >>>          }catch (CMSException& e) {}
>> >>>
>> >>>         try{
>> >>>          if( session != NULL ) delete session;
>> >>>          }catch (CMSException& e) {}
>> >>>          session = NULL;
>> >>>
>> >>>          try{
>> >>>          if( connection != NULL ) delete connection;
>> >>>          }catch (CMSException& e) {}
>> >>>          connection = NULL;
>> >>>     }
>> >>> };
>> >>>   void Produce()
>> >>>  {
>> >>>      HelloWorldProducer producer( 2 );
>> >>>      Thread producerThread( &producer );
>> >>>      producerThread.start();
>> >>>      producerThread.join();
>> >>>  }
>> >>>  void Consumer()
>> >>>  {
>> >>>       HelloWorldConsumer consumer( 10000 );
>> >>>       Thread consumerThread( &consumer );
>> >>>       consumerThread.start();
>> >>>       consumerThread.join();
>> >>>  }
>> >>>  int main(int argc, char* argv[])
>> >>>  {
>> >>>       Produce();
>> >>>       Consumer();
>> >>>  }
>> >>> --
>> >>> View this message in context: http://www.nabble.com/above-topic-
>> >>> tf2641566.html#a7373622
>> >>> Sent from the ActiveMQ - User mailing list archive at Nabble.com.
>> >>
>> >>
>> >>
>> >
>> >
>> 
>> --
>> View this message in context: http://www.nabble.com/above-topic-
>> tf2641566.html#a7393074
>> Sent from the ActiveMQ - User mailing list archive at Nabble.com.
> 
> 
> 

-- 
View this message in context: 
http://www.nabble.com/above-topic-tf2641566.html#a7413840
Sent from the ActiveMQ - User mailing list archive at Nabble.com.

Reply via email to