Just so you know I haven't forgotten about you :) ...
I have replicated the problem and I believe it is an issue with the broker. I'm going to dig into it more later. On 11/30/06, sgliu <[EMAIL PROTECTED]> wrote:
Sorry,according to your indication,message doesn't disappear. I hope: send first,a few time later,I receive nothing. (a few time later,message will disappear itself.) follow source code: #include <activemq/concurrent/Thread.h> #include <activemq/concurrent/Runnable.h> #include <activemq/core/ActiveMQConnectionFactory.h> #include <activemq/util/Integer.h> #include <cms/Connection.h> #include <cms/Session.h> #include <cms/TextMessage.h> #include <cms/ExceptionListener.h> #include <cms/MessageListener.h> #include <stdlib.h> using namespace activemq::core; using namespace activemq::util; using namespace activemq::concurrent; using namespace cms; using namespace std; #include <sys/timeb.h> unsigned long getcurt() { struct timeb t; ftime (&t); unsigned long timeStamp = (t.time * 1000LL) + t.millitm; return timeStamp; } class HelloWorldProducer : public Runnable { private: Connection* connection; Session* session; Topic* destination; MessageProducer* producer; int numMessages; public: HelloWorldProducer( int numMessages ){ connection = NULL; session = NULL; destination = NULL; producer = NULL; this->numMessages = numMessages; } virtual ~HelloWorldProducer(){ cleanup(); } virtual void run() { try { string user,passwd,sID; user="default"; passwd=""; sID="lsgID"; // Create a ConnectionFactory ActiveMQConnectionFactory* connectionFactory = new ActiveMQConnectionFactory("tcp://localhost:61613",user,passwd,sID); // Create a Connection connection = connectionFactory->createConnection(user,passwd,sID); connection->start(); string sss=connection->getClientId(); cout << sss << endl; session = connection->createSession( Session::AUTO_ACKNOWLEDGE ); destination = session->createTopic( "mytopic" ); producer = session->createProducer( destination ); producer->setDeliveryMode( DeliveryMode::PERSISTANT ); unsigned long ttt=getcurt(); producer->setTimeToLive( 10000); // Create the Thread Id String string threadIdStr = Integer::toString( Thread::getId() ); // Create a messages string text = (string)"Hello world! from thread " + threadIdStr; for( int ix=0; ix<numMessages; ++ix ){ TextMessage* message = session->createTextMessage( text ); // message->setCMSTimeStamp(ttt); // message->setCMSExpiration(ttt + 10000); //消息到期时间 // string messageID="messageID"; // message->setCMSMessageId(messageID); //消息ID // producer->setTimeToLive(10000); // Tell the producer to send the message printf( "Sent message from thread %s\n", threadIdStr.c_str() ); producer->send( message ); delete message; } }catch ( CMSException& e ) { e.printStackTrace(); } } private: void cleanup(){ // Destroy resources. try{ if( destination != NULL ) delete destination; }catch ( CMSException& e ) {} destination = NULL; try{ if( producer != NULL ) delete producer; }catch ( CMSException& e ) {} producer = NULL; // Close open resources. try{ if( session != NULL ) session->close(); if( connection != NULL ) connection->close(); }catch ( CMSException& e ) {} try{ if( session != NULL ) delete session; }catch ( CMSException& e ) {} session = NULL; try{ if( connection != NULL ) delete connection; }catch ( CMSException& e ) {} connection = NULL; } }; class HelloWorldConsumer : public ExceptionListener, public MessageListener, public Runnable { private: Connection* connection; Session* session; Topic* destination; MessageConsumer* consumer; long waitMillis; public: HelloWorldConsumer( long waitMillis ){ connection = NULL; session = NULL; destination = NULL; consumer = NULL; this->waitMillis = waitMillis; } virtual ~HelloWorldConsumer(){ cleanup(); } virtual void run() { try { string user,passwd,sID; user="default"; passwd=""; sID="lsgID"; // Create a ConnectionFactory ActiveMQConnectionFactory* connectionFactory = new ActiveMQConnectionFactory( "tcp://localhost:61613",user,passwd,sID); // Create a Connection connection = connectionFactory->createConnection();//user,passwd,sID); delete connectionFactory; connection->start(); connection->setExceptionListener(this); // Create a Session session = connection->createSession( Session::AUTO_ACKNOWLEDGE ); // Create the destination (Topic or Queue) destination = session->createTopic( "mytopic?consumer.retroactive=true" ); consumer = session->createDurableConsumer( destination , user , "",false); consumer->setMessageListener( this ); // Sleep while asynchronous messages come in. Thread::sleep( waitMillis ); } catch (CMSException& e) { e.printStackTrace(); } } virtual void onMessage( const Message* message ){ try { const TextMessage* textMessage = dynamic_cast< const TextMessage* >( message ); string text = textMessage->getText(); printf( "Received: %s\n", text.c_str() ); } catch (CMSException& e) { e.printStackTrace(); } } virtual void onException( const CMSException& ex ) { printf("JMS Exception occured. Shutting down client.\n"); } private: void cleanup(){ // Destroy resources. try{ if( destination != NULL ) delete destination; }catch (CMSException& e) {} destination = NULL; try{ if( consumer != NULL ) delete consumer; }catch (CMSException& e) {} consumer = NULL; // Close open resources. try{ if( session != NULL ) session->close(); if( connection != NULL ) connection->close(); }catch (CMSException& e) {} try{ if( session != NULL ) delete session; }catch (CMSException& e) {} session = NULL; try{ if( connection != NULL ) delete connection; }catch (CMSException& e) {} connection = NULL; } }; void Produce() { HelloWorldProducer producer( 2 ); Thread producerThread( &producer ); producerThread.start(); producerThread.join(); } void Consumer() { HelloWorldConsumer consumer( 5000 ); Thread consumerThread( &consumer ); consumerThread.start(); consumerThread.join(); } int main(int argc, char* argv[]) { // Produce(); cout << "Produce End." << endl; Consumer(); } ---------------------------------------- I made follow two experiments: 1. In HelloWorldProducer,add follow code,but 11 seconds later, I receive message yet. producer->setTimeToLive( 10000); 2. In HelloWorldProducer,add follow code,but 11 seconds later, I receive message yet. unsigned long ttt=getcurt(); producer->setTimeToLive( ttt + 10000); Each time, at first,I have commented out the line:Consumer() in main function,and run program. Second,wait 20 seconds. Third,I have commented out the line:Produce() in main function,and run program. Both two experiment,I have received message. That message don't disappear itself. Why? Please help me. -- View this message in context: http://www.nabble.com/Message%27s-live-time-tf2706004.html#a7617317 Sent from the ActiveMQ - User mailing list archive at Nabble.com.