Just so you know I haven't forgotten about you :) ...

I have replicated the problem and I believe it is an issue with the broker.
I'm going to dig into it more later.


On 11/30/06, sgliu <[EMAIL PROTECTED]> wrote:


Sorry,according to your indication,message doesn't disappear.
I hope:
send first,a few time later,I receive nothing. (a few time later,message
will disappear itself.)
follow source code:
#include <activemq/concurrent/Thread.h>
#include <activemq/concurrent/Runnable.h>
#include <activemq/core/ActiveMQConnectionFactory.h>
#include <activemq/util/Integer.h>
#include <cms/Connection.h>
#include <cms/Session.h>
#include <cms/TextMessage.h>
#include <cms/ExceptionListener.h>
#include <cms/MessageListener.h>
#include <stdlib.h>

using namespace activemq::core;
using namespace activemq::util;
using namespace activemq::concurrent;
using namespace cms;
using namespace std;

#include <sys/timeb.h>
unsigned long getcurt()
{
        struct timeb t;
        ftime (&t);
        unsigned long timeStamp = (t.time * 1000LL) + t.millitm;
        return timeStamp;
}

class HelloWorldProducer : public Runnable {
private:

        Connection* connection;
        Session* session;
        Topic* destination;
        MessageProducer* producer;
        int numMessages;

public:

        HelloWorldProducer( int numMessages ){
                connection = NULL;
        session = NULL;
        destination = NULL;
        producer = NULL;
        this->numMessages = numMessages;
        }

        virtual ~HelloWorldProducer(){
                cleanup();
        }

    virtual void run() {
        try {
                        string user,passwd,sID;
                        user="default";
                        passwd="";
                        sID="lsgID";
            // Create a ConnectionFactory
            ActiveMQConnectionFactory* connectionFactory = new
ActiveMQConnectionFactory("tcp://localhost:61613",user,passwd,sID);

            // Create a Connection
            connection =
connectionFactory->createConnection(user,passwd,sID);
            connection->start();

                        string sss=connection->getClientId();
                        cout << sss << endl;

            session = connection->createSession( Session::AUTO_ACKNOWLEDGE
);
                        destination = session->createTopic( "mytopic" );

                        producer = session->createProducer( destination );
            producer->setDeliveryMode( DeliveryMode::PERSISTANT );

                        unsigned long ttt=getcurt();
                        producer->setTimeToLive( 10000);

            // Create the Thread Id String
            string threadIdStr = Integer::toString( Thread::getId() );

            // Create a messages
            string text = (string)"Hello world! from thread " +
threadIdStr;

            for( int ix=0; ix<numMessages; ++ix ){
                    TextMessage* message = session->createTextMessage(
text );

        //                      message->setCMSTimeStamp(ttt);
        //                      message->setCMSExpiration(ttt +
10000);         //消息到期时间
        //                      string messageID="messageID";

        //                      message->setCMSMessageId(messageID);            
//消息ID
                        //      producer->setTimeToLive(10000);


                // Tell the producer to send the message
                    printf( "Sent message from thread %s\n",
threadIdStr.c_str() );
                        producer->send( message );

                delete message;
            }

        }catch ( CMSException& e ) {
            e.printStackTrace();
        }
    }

private:

    void cleanup(){

                        // Destroy resources.
                        try{
                if( destination != NULL ) delete destination;
                        }catch ( CMSException& e ) {}
                        destination = NULL;

                        try{
                    if( producer != NULL ) delete producer;
                        }catch ( CMSException& e ) {}
                        producer = NULL;

                // Close open resources.
                try{
                        if( session != NULL ) session->close();
                        if( connection != NULL ) connection->close();
                        }catch ( CMSException& e ) {}

                        try{
                if( session != NULL ) delete session;
                        }catch ( CMSException& e ) {}
                        session = NULL;

            try{
                if( connection != NULL ) delete connection;
                        }catch ( CMSException& e ) {}
                connection = NULL;
    }
};

class HelloWorldConsumer : public ExceptionListener,
                           public MessageListener,
                           public Runnable {

private:

        Connection* connection;
        Session* session;
        Topic* destination;
        MessageConsumer* consumer;
        long waitMillis;

public:

        HelloWorldConsumer( long waitMillis ){
                connection = NULL;
        session = NULL;
        destination = NULL;
        consumer = NULL;
        this->waitMillis = waitMillis;
        }
    virtual ~HelloWorldConsumer(){
        cleanup();
    }

    virtual void run() {

        try {

                        string user,passwd,sID;
                        user="default";
                        passwd="";
                        sID="lsgID";
            // Create a ConnectionFactory
            ActiveMQConnectionFactory* connectionFactory =
                new ActiveMQConnectionFactory(
"tcp://localhost:61613",user,passwd,sID);

            // Create a Connection
            connection =
connectionFactory->createConnection();//user,passwd,sID);
            delete connectionFactory;
            connection->start();

            connection->setExceptionListener(this);

            // Create a Session
            session = connection->createSession( Session::AUTO_ACKNOWLEDGE
);

            // Create the destination (Topic or Queue)
                        destination = session->createTopic(
"mytopic?consumer.retroactive=true"
);

                        consumer = session->createDurableConsumer(
destination , user ,
"",false);

            consumer->setMessageListener( this );

            // Sleep while asynchronous messages come in.
            Thread::sleep( waitMillis );

        } catch (CMSException& e) {
            e.printStackTrace();
        }
    }

    virtual void onMessage( const Message* message ){

        try
        {
            const TextMessage* textMessage =
                dynamic_cast< const TextMessage* >( message );
            string text = textMessage->getText();
            printf( "Received: %s\n", text.c_str() );
        } catch (CMSException& e) {
            e.printStackTrace();
        }
    }

    virtual void onException( const CMSException& ex ) {
        printf("JMS Exception occured.  Shutting down client.\n");
    }

private:

    void cleanup(){

                // Destroy resources.
                try{
                if( destination != NULL ) delete destination;
                }catch (CMSException& e) {}
                destination = NULL;

                try{
            if( consumer != NULL ) delete consumer;
                }catch (CMSException& e) {}
                consumer = NULL;

                // Close open resources.
                try{
                        if( session != NULL ) session->close();
                        if( connection != NULL ) connection->close();
                }catch (CMSException& e) {}

        try{
                if( session != NULL ) delete session;
                }catch (CMSException& e) {}
                session = NULL;

                try{
                if( connection != NULL ) delete connection;
                }catch (CMSException& e) {}
                connection = NULL;
    }
};
  void Produce()
{
     HelloWorldProducer producer( 2 );
     Thread producerThread( &producer );
     producerThread.start();
     producerThread.join();
}
void Consumer()
{
      HelloWorldConsumer consumer( 5000 );
      Thread consumerThread( &consumer );
      consumerThread.start();
      consumerThread.join();
}
int main(int argc, char* argv[])
{
   //   Produce();

          cout << "Produce End." << endl;

      Consumer();
}
----------------------------------------
I made follow two experiments:
1.  In HelloWorldProducer,add follow code,but 11 seconds later, I receive
message yet.
producer->setTimeToLive( 10000);

2. In HelloWorldProducer,add follow code,but 11 seconds later, I receive
message yet.
unsigned long ttt=getcurt();
producer->setTimeToLive( ttt + 10000);

Each time, at first,I have commented out the line:Consumer() in main
function,and run program.
Second,wait 20 seconds.
Third,I have commented out the line:Produce() in main function,and run
program.
Both two experiment,I have received message.

That message don't disappear itself.
Why?
Please help me.
--
View this message in context:
http://www.nabble.com/Message%27s-live-time-tf2706004.html#a7617317
Sent from the ActiveMQ - User mailing list archive at Nabble.com.


Reply via email to