Author: nmittler Date: Wed Jan 24 12:36:44 2007 New Revision: 499559 URL: http://svn.apache.org/viewvc?view=rev&rev=499559 Log: Making Session.close() also close any non-closed child producers and consumers.
Added: incubator/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/util/Set.h Modified: incubator/activemq/activemq-cpp/trunk/activemq-cpp/src/main/Makefile.am incubator/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQSession.cpp incubator/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQSession.h incubator/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/util/Map.h incubator/activemq/activemq-cpp/trunk/activemq-cpp/vs2005-build/vs2005-activemq.vcproj Modified: incubator/activemq/activemq-cpp/trunk/activemq-cpp/src/main/Makefile.am URL: http://svn.apache.org/viewvc/incubator/activemq/activemq-cpp/trunk/activemq-cpp/src/main/Makefile.am?view=diff&rev=499559&r1=499558&r2=499559 ============================================================================== --- incubator/activemq/activemq-cpp/trunk/activemq-cpp/src/main/Makefile.am (original) +++ incubator/activemq/activemq-cpp/trunk/activemq-cpp/src/main/Makefile.am Wed Jan 24 12:36:44 2007 @@ -246,6 +246,7 @@ activemq/util/Map.h \ activemq/util/Math.h \ activemq/util/PrimitiveMap.h \ + activemq/util/Set.h \ activemq/util/URISupport.h \ cms/DeliveryMode.h \ cms/TemporaryQueue.h \ Modified: incubator/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQSession.cpp URL: http://svn.apache.org/viewvc/incubator/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQSession.cpp?view=diff&rev=499559&r1=499558&r2=499559 ============================================================================== --- incubator/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQSession.cpp (original) +++ incubator/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQSession.cpp Wed Jan 24 12:36:44 2007 @@ -61,7 +61,7 @@ } //////////////////////////////////////////////////////////////////////////////// -ActiveMQSession::~ActiveMQSession(void) +ActiveMQSession::~ActiveMQSession() { try { @@ -73,7 +73,7 @@ } //////////////////////////////////////////////////////////////////////////////// -void ActiveMQSession::close(void) throw ( cms::CMSException ) +void ActiveMQSession::close() throw ( cms::CMSException ) { // If we're already close, just get outta' here. if( closed ) { @@ -82,26 +82,42 @@ try { - // Mark as done. - closed = true; + // Get the complete list of closeable session resources. + std::vector<cms::Closeable*> allResources; + synchronized( &closableSessionResources ) { + allResources = closableSessionResources.toArray(); + } + // Close all of the resources. + for( unsigned int ix=0; ix<allResources.size(); ++ix ){ + cms::Closeable* resource = allResources[ix]; + try{ + resource->close(); + } catch( cms::CMSException& ex ){ + /* Absorb */ + } + } + // Destroy the Transaction if( transaction != NULL ){ delete transaction; transaction = NULL; } - + // Destroy this sessions resources connection->getConnectionData()-> getConnector()->destroyResource( sessionInfo ); sessionInfo = NULL; + + // Now indicate that this session is closed. + closed = true; } AMQ_CATCH_NOTHROW( ActiveMQException ) AMQ_CATCHALL_NOTHROW( ) } //////////////////////////////////////////////////////////////////////////////// -void ActiveMQSession::commit(void) throw ( cms::CMSException ) +void ActiveMQSession::commit() throw ( cms::CMSException ) { try { @@ -120,7 +136,7 @@ } //////////////////////////////////////////////////////////////////////////////// -void ActiveMQSession::rollback(void) throw ( cms::CMSException ) +void ActiveMQSession::rollback() throw ( cms::CMSException ) { try { @@ -194,13 +210,21 @@ "ActiveMQSession::createConsumer - Session Already Closed" ); } + // Create the consumer instance. ActiveMQConsumer* consumer = new ActiveMQConsumer( connection->getConnectionData()->getConnector()-> createConsumer( destination, sessionInfo, selector, noLocal ), this ); + + // Add the consumer to the map of closeable session resources. + synchronized( &closableSessionResources ) { + closableSessionResources.add( consumer ); + } + // Register this consumer as a listener of messages from the + // connection. connection->addMessageListener( consumer->getConsumerInfo()->getConsumerId(), consumer ); @@ -227,10 +251,18 @@ "ActiveMQSession::createProducer - Session Already Closed" ); } + // Create the consumer instance. ActiveMQConsumer* consumer = new ActiveMQConsumer( connection->getConnectionData()->getConnector()-> createDurableConsumer( destination, sessionInfo, name, selector, noLocal ), this ); + // Add the consumer to the map of closeable session resources. + synchronized( &closableSessionResources ) { + closableSessionResources.add( consumer ); + } + + // Register the consumer as a listener of messages from the + // connection. connection->addMessageListener( consumer->getConsumerInfo()->getConsumerId(), consumer ); @@ -254,9 +286,17 @@ "ActiveMQSession::createProducer - Session Already Closed" ); } - return new ActiveMQProducer( + // Create the producer instance. + ActiveMQProducer* producer = new ActiveMQProducer( connection->getConnectionData()->getConnector()-> createProducer( destination, sessionInfo ), this ); + + // Add the producer to the map of closeable session resources. + synchronized( &closableSessionResources ) { + closableSessionResources.add( producer ); + } + + return producer; } AMQ_CATCH_RETHROW( ActiveMQException ) AMQ_CATCHALL_THROW( ActiveMQException ) @@ -303,7 +343,7 @@ } //////////////////////////////////////////////////////////////////////////////// -cms::TemporaryQueue* ActiveMQSession::createTemporaryQueue(void) +cms::TemporaryQueue* ActiveMQSession::createTemporaryQueue() throw ( cms::CMSException ) { try @@ -324,7 +364,7 @@ } //////////////////////////////////////////////////////////////////////////////// -cms::TemporaryTopic* ActiveMQSession::createTemporaryTopic(void) +cms::TemporaryTopic* ActiveMQSession::createTemporaryTopic() throw ( cms::CMSException ) { try @@ -345,7 +385,7 @@ } //////////////////////////////////////////////////////////////////////////////// -cms::Message* ActiveMQSession::createMessage(void) +cms::Message* ActiveMQSession::createMessage() throw ( cms::CMSException ) { try @@ -365,7 +405,7 @@ } //////////////////////////////////////////////////////////////////////////////// -cms::BytesMessage* ActiveMQSession::createBytesMessage(void) +cms::BytesMessage* ActiveMQSession::createBytesMessage() throw ( cms::CMSException ) { try @@ -403,7 +443,7 @@ } //////////////////////////////////////////////////////////////////////////////// -cms::TextMessage* ActiveMQSession::createTextMessage(void) +cms::TextMessage* ActiveMQSession::createTextMessage() throw ( cms::CMSException ) { try @@ -439,7 +479,7 @@ } //////////////////////////////////////////////////////////////////////////////// -cms::MapMessage* ActiveMQSession::createMapMessage(void) +cms::MapMessage* ActiveMQSession::createMapMessage() throw ( cms::CMSException ) { try @@ -460,14 +500,14 @@ } //////////////////////////////////////////////////////////////////////////////// -cms::Session::AcknowledgeMode ActiveMQSession::getAcknowledgeMode(void) const +cms::Session::AcknowledgeMode ActiveMQSession::getAcknowledgeMode() const { return sessionInfo != NULL ? sessionInfo->getAckMode() : Session::AUTO_ACKNOWLEDGE; } //////////////////////////////////////////////////////////////////////////////// -bool ActiveMQSession::isTransacted(void) const +bool ActiveMQSession::isTransacted() const { return sessionInfo != NULL ? sessionInfo->getAckMode() == Session::SESSION_TRANSACTED : false; @@ -554,6 +594,14 @@ transaction->removeFromTransaction( consumer ); } } + + // Remove the entry from the session resource map if it's there + cms::Closeable* closeableResource = dynamic_cast<cms::Closeable*>(resource); + if( closeableResource != NULL ){ + synchronized( &closableSessionResources ) { + closableSessionResources.remove( closeableResource ); + } + } // Free its resources. connection->getConnectionData()-> @@ -564,7 +612,7 @@ } //////////////////////////////////////////////////////////////////////////////// -cms::ExceptionListener* ActiveMQSession::getExceptionListener(void) +cms::ExceptionListener* ActiveMQSession::getExceptionListener() { if( connection != NULL ) { Modified: incubator/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQSession.h URL: http://svn.apache.org/viewvc/incubator/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQSession.h?view=diff&rev=499559&r1=499558&r2=499559 ============================================================================== --- incubator/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQSession.h (original) +++ incubator/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQSession.h Wed Jan 24 12:36:44 2007 @@ -21,6 +21,8 @@ #include <cms/ExceptionListener.h> #include <activemq/connector/SessionInfo.h> #include <activemq/core/ActiveMQSessionResource.h> +#include <activemq/util/Set.h> +#include <set> namespace activemq{ namespace core{ @@ -36,17 +38,30 @@ { private: - // SessionInfo for this Session + /** + * SessionInfo for this Session + */ connector::SessionInfo* sessionInfo; - // Transaction Management object + /** + * Transaction Management object + */ ActiveMQTransaction* transaction; - // Connection + /** + * Connection + */ ActiveMQConnection* connection; - // Bool to indicate if this session was closed. + /** + * Bool to indicate if this session was closed. + */ bool closed; + + /** + * The set of closable session resources (consumers and producers). + */ + util::Set<cms::Closeable*> closableSessionResources; public: @@ -54,7 +69,7 @@ const util::Properties& properties, ActiveMQConnection* connection ); - virtual ~ActiveMQSession(void); + virtual ~ActiveMQSession(); public: // Implements Mehtods @@ -62,21 +77,21 @@ * Closes the Session * @throw CMSException */ - virtual void close(void) throw ( cms::CMSException ); + virtual void close() throw ( cms::CMSException ); /** * Commits all messages done in this transaction and releases any * locks currently held. * @throws CMSException */ - virtual void commit(void) throw ( cms::CMSException ); + virtual void commit() throw ( cms::CMSException ); /** * Rollsback all messages done in this transaction and releases any * locks currently held. * @throws CMSException */ - virtual void rollback(void) throw ( cms::CMSException ); + virtual void rollback() throw ( cms::CMSException ); /** * Creates a MessageConsumer for the specified destination. @@ -160,28 +175,28 @@ * Creates a TemporaryQueue object. * @throws CMSException */ - virtual cms::TemporaryQueue* createTemporaryQueue(void) + virtual cms::TemporaryQueue* createTemporaryQueue() throw ( cms::CMSException ); /** * Creates a TemporaryTopic object. * @throws CMSException */ - virtual cms::TemporaryTopic* createTemporaryTopic(void) + virtual cms::TemporaryTopic* createTemporaryTopic() throw ( cms::CMSException ); /** * Creates a new Message * @throws CMSException */ - virtual cms::Message* createMessage(void) + virtual cms::Message* createMessage() throw ( cms::CMSException ); /** * Creates a BytesMessage * @throws CMSException */ - virtual cms::BytesMessage* createBytesMessage(void) + virtual cms::BytesMessage* createBytesMessage() throw ( cms::CMSException ); /** @@ -199,7 +214,7 @@ * Creates a new TextMessage * @throws CMSException */ - virtual cms::TextMessage* createTextMessage(void) + virtual cms::TextMessage* createTextMessage() throw ( cms::CMSException ); /** @@ -214,20 +229,20 @@ * Creates a new TextMessage * @throws CMSException */ - virtual cms::MapMessage* createMapMessage(void) + virtual cms::MapMessage* createMapMessage() throw ( cms::CMSException ); /** * Returns the acknowledgement mode of the session. * @return the Sessions Acknowledge Mode */ - virtual cms::Session::AcknowledgeMode getAcknowledgeMode(void) const; + virtual cms::Session::AcknowledgeMode getAcknowledgeMode() const; /** * Gets if the Sessions is a Transacted Session * @return transacted true - false. */ - virtual bool isTransacted(void) const; + virtual bool isTransacted() const; public: // ActiveMQSession specific Methods @@ -268,14 +283,14 @@ * exceptions that occur in the context of another thread. * @returns cms::ExceptionListener pointer or NULL */ - virtual cms::ExceptionListener* getExceptionListener(void); + virtual cms::ExceptionListener* getExceptionListener(); /** * Gets the Session Information object for this session, if the * session is closed than this returns null * @return SessionInfo Pointer */ - virtual connector::SessionInfo* getSessionInfo(void) { + virtual connector::SessionInfo* getSessionInfo() { return sessionInfo; } Modified: incubator/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/util/Map.h URL: http://svn.apache.org/viewvc/incubator/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/util/Map.h?view=diff&rev=499559&r1=499558&r2=499559 ============================================================================== --- incubator/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/util/Map.h (original) +++ incubator/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/util/Map.h Wed Jan 24 12:36:44 2007 @@ -21,6 +21,8 @@ #include <map> #include <vector> #include <activemq/exceptions/NoSuchElementException.h> +#include <activemq/concurrent/Synchronizable.h> +#include <activemq/concurrent/Mutex.h> namespace activemq{ namespace util{ @@ -30,18 +32,39 @@ * a more user-friendly interface and to provide common * functions that do not exist in std::map. */ - template <typename K, typename V> class Map + template <typename K, typename V> class Map : public concurrent::Synchronizable { private: std::map<K,V> valueMap; + concurrent::Mutex mutex; public: + /** + * Default constructor - does nothing. + */ Map(){}; + + /** + * Copy constructor - copies the content of the given map into this + * one. + * @param source The source map. + */ + Map( const Map& source ){ + copy( source ); + } + virtual ~Map(){}; /** + * Copies the content of the source map into this map. Erases + * all existing data in this map. + * @param source The source object to copy from. + */ + virtual void copy( const Map& source ); + + /** * Removes all keys and values from this map. */ virtual void clear(); @@ -105,7 +128,87 @@ * @return the entire set of values in this map as a std::vector. */ virtual std::vector<V> getValues() const; + + public: // Methods from Synchronizable + + /** + * Locks the object. + * @throws ActiveMQException + */ + virtual void lock() throw(exceptions::ActiveMQException) { + mutex.lock(); + } + + /** + * Unlocks the object. + * @throws ActiveMQException + */ + virtual void unlock() throw(exceptions::ActiveMQException) { + mutex.unlock(); + } + + /** + * Waits on a signal from this object, which is generated + * by a call to Notify. Must have this object locked before + * calling. + * @throws ActiveMQException + */ + virtual void wait() throw(exceptions::ActiveMQException) { + mutex.wait(); + } + + /** + * Waits on a signal from this object, which is generated + * by a call to Notify. Must have this object locked before + * calling. This wait will timeout after the specified time + * interval. + * @param millisecs the time in millisecsonds to wait, or + * WAIT_INIFINITE + * @throws ActiveMQException + */ + virtual void wait(unsigned long millisecs) + throw(exceptions::ActiveMQException) { + mutex.wait(millisecs); + } + + /** + * Signals a waiter on this object that it can now wake + * up and continue. Must have this object locked before + * calling. + * @throws ActiveMQException + */ + virtual void notify() throw( exceptions::ActiveMQException ) { + mutex.notify(); + } + + /** + * Signals the waiters on this object that it can now wake + * up and continue. Must have this object locked before + * calling. + * @throws ActiveMQException + */ + virtual void notifyAll() throw( exceptions::ActiveMQException ) { + mutex.notifyAll(); + } }; + + //////////////////////////////////////////////////////////////////////////// + template <typename K, typename V> + void Map<K,V>::copy( const Map<K,V>& source ) { + + // Get an iterator to the beginning of the source map. + typename std::map<K,V>::const_iterator iter; + iter = source.valueMap.begin(); + + // Erase the content of this object. + clear(); + + // Add all of the entries to this map. + for( ; iter != source.valueMap.end(); iter++ ){ + setValue( iter->first, iter->second ); + } + + } //////////////////////////////////////////////////////////////////////////// template <typename K, typename V> Added: incubator/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/util/Set.h URL: http://svn.apache.org/viewvc/incubator/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/util/Set.h?view=auto&rev=499559 ============================================================================== --- incubator/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/util/Set.h (added) +++ incubator/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/util/Set.h Wed Jan 24 12:36:44 2007 @@ -0,0 +1,231 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#ifndef ACTIVEMQ_UTIL_SET_H_ +#define ACTIVEMQ_UTIL_SET_H_ + +#include <set> +#include <vector> +#include <activemq/exceptions/NoSuchElementException.h> +#include <activemq/concurrent/Synchronizable.h> +#include <activemq/concurrent/Mutex.h> + +namespace activemq{ +namespace util{ + + /** + * Map template that wraps around a std::map to provide + * a more user-friendly interface and to provide common + * functions that do not exist in std::map. + */ + template <typename E> class Set : public concurrent::Synchronizable + { + private: + + std::set<E> values; + concurrent::Mutex mutex; + + public: + + /** + * Default constructor - does nothing. + */ + Set(){}; + + /** + * Copy constructor - copies the content of the given set into this + * one. + * @param source The source set. + */ + Set( const Set& source ){ + copy( source ); + } + + virtual ~Set(){}; + + /** + * Copies the content of the source set into this set. Erases + * all existing data in this st. + * @param source The source object to copy from. + */ + virtual void copy( const Set& source ); + + /** + * Removes all values from this set. + */ + virtual void clear(); + + /** + * Indicates whether or this set contains the given value. + * @param value The value to look up. + * @return true if this set contains the value, otherwise false. + */ + virtual bool contains( const E& value ) const; + + /** + * @return if the set contains any element or not, TRUE or FALSE + */ + virtual bool isEmpty() const; + + /** + * @return The number of elements in this set. + */ + virtual unsigned int size() const; + + /** + * Adds the given value to the set. + * @param value The value to add. + */ + virtual void add( const E& value ); + + /** + * Removes the value from the set. + * @param value The value to be removed. + */ + virtual void remove( const E& value ); + + /** + * @return the all values in this set as a std::vector. + */ + virtual std::vector<E> toArray() const; + + public: // Methods from Synchronizable + + /** + * Locks the object. + * @throws ActiveMQException + */ + virtual void lock() throw(exceptions::ActiveMQException) { + mutex.lock(); + } + + /** + * Unlocks the object. + * @throws ActiveMQException + */ + virtual void unlock() throw(exceptions::ActiveMQException) { + mutex.unlock(); + } + + /** + * Waits on a signal from this object, which is generated + * by a call to Notify. Must have this object locked before + * calling. + * @throws ActiveMQException + */ + virtual void wait() throw(exceptions::ActiveMQException) { + mutex.wait(); + } + + /** + * Waits on a signal from this object, which is generated + * by a call to Notify. Must have this object locked before + * calling. This wait will timeout after the specified time + * interval. + * @param millisecs the time in millisecsonds to wait, or + * WAIT_INIFINITE + * @throws ActiveMQException + */ + virtual void wait(unsigned long millisecs) + throw(exceptions::ActiveMQException) { + mutex.wait(millisecs); + } + + /** + * Signals a waiter on this object that it can now wake + * up and continue. Must have this object locked before + * calling. + * @throws ActiveMQException + */ + virtual void notify() throw( exceptions::ActiveMQException ) { + mutex.notify(); + } + + /** + * Signals the waiters on this object that it can now wake + * up and continue. Must have this object locked before + * calling. + * @throws ActiveMQException + */ + virtual void notifyAll() throw( exceptions::ActiveMQException ) { + mutex.notifyAll(); + } + }; + + //////////////////////////////////////////////////////////////////////////// + template <typename E> + void Set<E>::copy( const Set<E>& source ) { + + // Add all of the entries to this map. + values = source.values; + } + + //////////////////////////////////////////////////////////////////////////// + template <typename E> + void Set<E>::clear(){ + values.clear(); + } + + //////////////////////////////////////////////////////////////////////////// + template <typename E> + bool Set<E>::contains(const E& value) const{ + typename std::set<E>::const_iterator iter; + iter = values.find(value); + return iter != values.end(); + } + + //////////////////////////////////////////////////////////////////////////// + template <typename E> + bool Set<E>::isEmpty() const{ + return values.empty(); + } + + //////////////////////////////////////////////////////////////////////////// + template <typename E> + unsigned int Set<E>::size() const{ + return values.size(); + } + + //////////////////////////////////////////////////////////////////////////// + template <typename E> + void Set<E>::add( const E& value ){ + values.insert(value); + } + + //////////////////////////////////////////////////////////////////////////// + template <typename E> + void Set<E>::remove( const E& value ){ + values.erase(value); + } + + //////////////////////////////////////////////////////////////////////////// + template <typename E> + std::vector<E> Set<E>::toArray() const{ + std::vector<E> valueArray(values.size()); + + typename std::set<E>::const_iterator iter; + iter=values.begin(); + for( int ix=0; iter != values.end(); ++iter, ++ix ){ + valueArray.push_back( *iter ); + } + + return valueArray; + } + +}} + +#endif /*ACTIVEMQ_UTIL_SET_H_*/ Modified: incubator/activemq/activemq-cpp/trunk/activemq-cpp/vs2005-build/vs2005-activemq.vcproj URL: http://svn.apache.org/viewvc/incubator/activemq/activemq-cpp/trunk/activemq-cpp/vs2005-build/vs2005-activemq.vcproj?view=diff&rev=499559&r1=499558&r2=499559 ============================================================================== --- incubator/activemq/activemq-cpp/trunk/activemq-cpp/vs2005-build/vs2005-activemq.vcproj (original) +++ incubator/activemq/activemq-cpp/trunk/activemq-cpp/vs2005-build/vs2005-activemq.vcproj Wed Jan 24 12:36:44 2007 @@ -328,6 +328,10 @@ > </File> <File + RelativePath="..\src\main\activemq\util\Set.h" + > + </File> + <File RelativePath="..\src\main\activemq\util\SimpleProperties.h" > </File>