Author: tabish Date: Sun Dec 3 16:41:27 2006 New Revision: 481999 URL: http://svn.apache.org/viewvc?view=rev&rev=481999 Log: http://issues.apache.org/activemq/browse/AMQCPP-14
Added initail Time to Live processing to the Consumer, and the Time Stamping is now correct in the producer. Modified: incubator/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/connector/openwire/commands/ActiveMQBytesMessage.h incubator/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/connector/openwire/commands/ActiveMQMapMessage.h incubator/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/connector/openwire/commands/ActiveMQMessage.h incubator/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/connector/openwire/commands/ActiveMQObjectMessage.h incubator/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/connector/openwire/commands/ActiveMQStreamMessage.h incubator/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/connector/openwire/commands/ActiveMQTextMessage.h incubator/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/connector/stomp/commands/StompMessage.h incubator/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQConsumer.cpp incubator/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQMessage.h Modified: incubator/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/connector/openwire/commands/ActiveMQBytesMessage.h URL: http://svn.apache.org/viewvc/incubator/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/connector/openwire/commands/ActiveMQBytesMessage.h?view=diff&rev=481999&r1=481998&r2=481999 ============================================================================== --- incubator/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/connector/openwire/commands/ActiveMQBytesMessage.h (original) +++ incubator/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/connector/openwire/commands/ActiveMQBytesMessage.h Sun Dec 3 16:41:27 2006 @@ -434,6 +434,22 @@ */ virtual unsigned long long getBodyLength(void) const; + public: // ActiveMQMessage + + /** + * Returns if this message has expired, meaning that its + * Expiration time has elapsed. + * @returns true if message is expired. + */ + virtual bool isExpired() const { + long long expireTime = this->getCMSExpiration(); + long long currentTime = util::Date::getCurrentTimeMilliseconds(); + if( expireTime > 0 && currentTime > expireTime ) { + return true; + } + return false; + } + }; }}}} Modified: incubator/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/connector/openwire/commands/ActiveMQMapMessage.h URL: http://svn.apache.org/viewvc/incubator/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/connector/openwire/commands/ActiveMQMapMessage.h?view=diff&rev=481999&r1=481998&r2=481999 ============================================================================== --- incubator/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/connector/openwire/commands/ActiveMQMapMessage.h (original) +++ incubator/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/connector/openwire/commands/ActiveMQMapMessage.h Sun Dec 3 16:41:27 2006 @@ -578,6 +578,22 @@ virtual void setString( const std::string& name, const std::string& value ); + public: // ActiveMQMessage + + /** + * Returns if this message has expired, meaning that its + * Expiration time has elapsed. + * @returns true if message is expired. + */ + virtual bool isExpired() const { + long long expireTime = this->getCMSExpiration(); + long long currentTime = util::Date::getCurrentTimeMilliseconds(); + if( expireTime > 0 && currentTime > expireTime ) { + return true; + } + return false; + } + }; }}}} Modified: incubator/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/connector/openwire/commands/ActiveMQMessage.h URL: http://svn.apache.org/viewvc/incubator/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/connector/openwire/commands/ActiveMQMessage.h?view=diff&rev=481999&r1=481998&r2=481999 ============================================================================== --- incubator/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/connector/openwire/commands/ActiveMQMessage.h (original) +++ incubator/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/connector/openwire/commands/ActiveMQMessage.h Sun Dec 3 16:41:27 2006 @@ -22,6 +22,7 @@ #include <activemq/core/ActiveMQMessage.h> #include <activemq/connector/openwire/marshal/BaseDataStreamMarshaller.h> #include <activemq/core/ActiveMQAckHandler.h> +#include <activemq/util/Date.h> namespace activemq{ namespace connector{ @@ -106,6 +107,15 @@ this->redeliveryCount = count; } + /** + * Returns if this message has expired, meaning that its + * Expiration time has elapsed. + * @returns true if message is expired. + */ + virtual bool isExpired() const { + return false; + } + private: core::ActiveMQAckHandler* ackHandler; Modified: incubator/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/connector/openwire/commands/ActiveMQObjectMessage.h URL: http://svn.apache.org/viewvc/incubator/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/connector/openwire/commands/ActiveMQObjectMessage.h?view=diff&rev=481999&r1=481998&r2=481999 ============================================================================== --- incubator/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/connector/openwire/commands/ActiveMQObjectMessage.h (original) +++ incubator/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/connector/openwire/commands/ActiveMQObjectMessage.h Sun Dec 3 16:41:27 2006 @@ -67,6 +67,18 @@ virtual void copyDataStructure( const DataStructure* src ) { ActiveMQMessage::copyDataStructure( src ); } + + + public: // ActiveMQMessage + + /** + * Returns if this message has expired, meaning that its + * Expiration time has elapsed. + * @returns true if message is expired. + */ + virtual bool isExpired() const { + return false; + } }; Modified: incubator/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/connector/openwire/commands/ActiveMQStreamMessage.h URL: http://svn.apache.org/viewvc/incubator/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/connector/openwire/commands/ActiveMQStreamMessage.h?view=diff&rev=481999&r1=481998&r2=481999 ============================================================================== --- incubator/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/connector/openwire/commands/ActiveMQStreamMessage.h (original) +++ incubator/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/connector/openwire/commands/ActiveMQStreamMessage.h Sun Dec 3 16:41:27 2006 @@ -68,6 +68,17 @@ ActiveMQMessage::copyDataStructure( src ); } + public: // ActiveMQMessage + + /** + * Returns if this message has expired, meaning that its + * Expiration time has elapsed. + * @returns true if message is expired. + */ + virtual bool isExpired() const { + return false; + } + }; }}}} Modified: incubator/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/connector/openwire/commands/ActiveMQTextMessage.h URL: http://svn.apache.org/viewvc/incubator/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/connector/openwire/commands/ActiveMQTextMessage.h?view=diff&rev=481999&r1=481998&r2=481999 ============================================================================== --- incubator/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/connector/openwire/commands/ActiveMQTextMessage.h (original) +++ incubator/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/connector/openwire/commands/ActiveMQTextMessage.h Sun Dec 3 16:41:27 2006 @@ -428,6 +428,22 @@ */ virtual void setText( const std::string& msg ) throw( cms::CMSException ); + public: // ActiveMQMessage + + /** + * Returns if this message has expired, meaning that its + * Expiration time has elapsed. + * @returns true if message is expired. + */ + virtual bool isExpired() const { + long long expireTime = this->getCMSExpiration(); + long long currentTime = util::Date::getCurrentTimeMilliseconds(); + if( expireTime > 0 && currentTime > expireTime ) { + return true; + } + return false; + } + }; }}}} Modified: incubator/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/connector/stomp/commands/StompMessage.h URL: http://svn.apache.org/viewvc/incubator/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/connector/stomp/commands/StompMessage.h?view=diff&rev=481999&r1=481998&r2=481999 ============================================================================== --- incubator/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/connector/stomp/commands/StompMessage.h (original) +++ incubator/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/connector/stomp/commands/StompMessage.h Sun Dec 3 16:41:27 2006 @@ -27,6 +27,7 @@ #include <activemq/exceptions/NoSuchElementException.h> #include <activemq/exceptions/RuntimeException.h> +#include <activemq/util/Date.h> #include <activemq/util/Long.h> #include <activemq/util/Integer.h> #include <activemq/util/Boolean.h> @@ -516,6 +517,20 @@ CommandConstants::toString( CommandConstants::HEADER_REDELIVERYCOUNT ), util::Integer::toString( count ) ); + } + + /** + * Returns if this message has expired, meaning that its + * Expiration time has elapsed. + * @returns true if message is expired. + */ + virtual bool isExpired() const { + long long expireTime = this->getCMSExpiration(); + long long currentTime = util::Date::getCurrentTimeMilliseconds(); + if( expireTime > 0 && currentTime > expireTime ) { + return true; + } + return false; } protected: Modified: incubator/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQConsumer.cpp URL: http://svn.apache.org/viewvc/incubator/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQConsumer.cpp?view=diff&rev=481999&r1=481998&r2=481999 ============================================================================== --- incubator/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQConsumer.cpp (original) +++ incubator/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQConsumer.cpp Sun Dec 3 16:41:27 2006 @@ -291,23 +291,31 @@ { try { + // Don't dispatch expired messages, ack it and then destroy it + if( message->isExpired() ) { + session->acknowledge( this, message ); + delete message; + + // stop now, don't queue + return; + } + // If the Session is in ClientAcknowledge mode, then we set the // handler in the message to this object and send it out. Otherwise // we ack it here for all the other Modes. - if( session->getAcknowledgeMode() == Session::CLIENT_ACKNOWLEDGE ) - { + if( session->getAcknowledgeMode() == Session::CLIENT_ACKNOWLEDGE ) { + // Register ourself so that we can handle the Message's // acknowledge method. message->setAckHandler( this ); - } - else - { + + } else { session->acknowledge( this, message ); } // No listener, so we queue it - synchronized( &msgQueue ) - { + synchronized( &msgQueue ) { + msgQueue.push( dynamic_cast< cms::Message* >( message ) ); msgQueue.notifyAll(); } Modified: incubator/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQMessage.h URL: http://svn.apache.org/viewvc/incubator/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQMessage.h?view=diff&rev=481999&r1=481998&r2=481999 ============================================================================== --- incubator/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQMessage.h (original) +++ incubator/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQMessage.h Sun Dec 3 16:41:27 2006 @@ -57,6 +57,13 @@ * @param count the redelivery count */ virtual void setRedeliveryCount( int count ) = 0; + + /** + * Returns if this message has expired, meaning that its + * Expiration time has elapsed. + * @returns true if message is expired. + */ + virtual bool isExpired() const = 0; };