Author: nmittler Date: Wed Jan 24 13:54:58 2007 New Revision: 499583 URL: http://svn.apache.org/viewvc?view=rev&rev=499583 Log: Making Connection.close() also close any non-closed child sessions.
Modified: incubator/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQConnection.cpp incubator/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQConnection.h incubator/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQSession.cpp incubator/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQSession.h incubator/activemq/activemq-cpp/trunk/activemq-cpp/src/main/cms/Connection.h incubator/activemq/activemq-cpp/trunk/activemq-cpp/src/main/cms/Session.h Modified: incubator/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQConnection.cpp URL: http://svn.apache.org/viewvc/incubator/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQConnection.cpp?view=diff&rev=499583&r1=499582&r2=499583 ============================================================================== --- incubator/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQConnection.cpp (original) +++ incubator/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQConnection.cpp Wed Jan 24 13:54:58 2007 @@ -73,10 +73,18 @@ { try { - return new ActiveMQSession( + // Create the session instance. + ActiveMQSession* session = new ActiveMQSession( connectionData->getConnector()->createSession( ackMode ), connectionData->getProperties(), this ); + + // Add the session to the set of active sessions. + synchronized( &activeSessions ) { + activeSessions.add( session ); + } + + return session; } AMQ_CATCH_RETHROW( ActiveMQException ) AMQ_CATCHALL_THROW( ActiveMQException ) @@ -97,6 +105,22 @@ { return; } + + // Get the complete list of active sessions. + std::vector<cms::Session*> allSessions; + synchronized( &activeSessions ) { + allSessions = activeSessions.toArray(); + } + + // Close all of the resources. + for( unsigned int ix=0; ix<allSessions.size(); ++ix ){ + cms::Session* session = allSessions[ix]; + try{ + session->close(); + } catch( cms::CMSException& ex ){ + /* Absorb */ + } + } // Once current deliveries are done this stops the delivery // of any new messages. @@ -136,9 +160,9 @@ ActiveMQMessageListener* listener ) { // Place in Map - synchronized( &mutex ) + synchronized( &consumers ) { - consumers[consumerId] = listener; + consumers.setValue( consumerId, listener ); } } @@ -146,9 +170,9 @@ void ActiveMQConnection::removeMessageListener( const unsigned int consumerId ) { // Remove from Map - synchronized( &mutex ) + synchronized( &consumers ) { - consumers.erase( consumerId ); + consumers.remove( consumerId ); } } @@ -187,12 +211,14 @@ } // Started, so lock map and dispatch the message. - synchronized( &mutex ) + synchronized( &consumers ) { - if(consumers.find( consumer->getConsumerId()) != consumers.end() ) + if( consumers.containsKey(consumer->getConsumerId()) ) { - consumers[consumer->getConsumerId()]-> - onActiveMQMessage( message ); + ActiveMQMessageListener* listener = + consumers.getValue(consumer->getConsumerId()); + + listener->onActiveMQMessage( message ); } } } @@ -217,5 +243,27 @@ if( exceptionListener != NULL ){ exceptionListener->onException( ex ); } -} +} + +//////////////////////////////////////////////////////////////////////////////// +void ActiveMQConnection::removeSession( ActiveMQSession* session ) + throw ( cms::CMSException ) +{ + try + { + // Remove this session from the set of active sessions. + synchronized( &activeSessions ) { + activeSessions.remove( session ); + } + + // Destroy this sessions resources + getConnectionData()-> + getConnector()->destroyResource( session->getSessionInfo() ); + + } + AMQ_CATCH_RETHROW( ActiveMQException ) + AMQ_CATCHALL_THROW( ActiveMQException ) +} + + Modified: incubator/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQConnection.h URL: http://svn.apache.org/viewvc/incubator/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQConnection.h?view=diff&rev=499583&r1=499582&r2=499583 ============================================================================== --- incubator/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQConnection.h (original) +++ incubator/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQConnection.h Wed Jan 24 13:54:58 2007 @@ -14,27 +14,33 @@ * See the License for the specific language governing permissions and * limitations under the License. */ + #ifndef _ACTIVEMQ_CORE_ACTIVEMQCONNECTION_H_ #define _ACTIVEMQ_CORE_ACTIVEMQCONNECTION_H_ #include <cms/Connection.h> #include <cms/ExceptionListener.h> -#include <activemq/concurrent/Mutex.h> #include <activemq/core/ActiveMQConnectionData.h> #include <activemq/core/ActiveMQMessageListener.h> #include <activemq/core/ActiveMQMessage.h> #include <activemq/connector/ConsumerMessageListener.h> #include <activemq/util/Properties.h> +#include <activemq/util/Map.h> +#include <activemq/util/Set.h> -#include <map> #include <string> namespace activemq{ namespace core{ - class cms::Session; + class cms::Session; + class ActiveMQSession; class ActiveMQConsumer; + /** + * Concrete connection used for all connectors to the + * ActiveMQ broker. + */ class ActiveMQConnection : public cms::Connection, public connector::ConsumerMessageListener, @@ -42,24 +48,36 @@ { private: - // the registered exception listener + /** + * the registered exception listener + */ cms::ExceptionListener* exceptionListener; - // All the data that is used to connect this Connection + /** + * All the data that is used to connect this Connection + */ ActiveMQConnectionData* connectionData; - // Indicates if this Connection is started + /** + * Indicates if this Connection is started + */ bool started; - // Indicates that this connection has been closed, it is no longer - // usable after this becomes true + /** + * Indicates that this connection has been closed, it is no longer + * usable after this becomes true + */ bool closed; - // Map of Consumer Ids to ActiveMQMessageListeners - std::map< unsigned int, ActiveMQMessageListener* > consumers; - - // Mutex to lock the Consumers Map - concurrent::Mutex mutex; + /** + * Map of Consumer Ids to ActiveMQMessageListeners + */ + util::Map< unsigned int, ActiveMQMessageListener* > consumers; + + /** + * Maintain the set of all active sessions. + */ + util::Set<cms::Session*> activeSessions; public: @@ -71,6 +89,13 @@ virtual ~ActiveMQConnection(); + /** + * Removes the session resources for the given session + * instance. + * @param session The session to be unregistered from this connection. + */ + virtual void removeSession( ActiveMQSession* session ) throw ( cms::CMSException ); + public: // Connection Interface Methods /** @@ -118,7 +143,9 @@ }; /** - * Close the currently open connection + * Closes this connection as well as any Sessions + * created from it (and those Sessions' consumers and + * producers). * @throws CMSException */ virtual void close() throw ( cms::CMSException ); Modified: incubator/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQSession.cpp URL: http://svn.apache.org/viewvc/incubator/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQSession.cpp?view=diff&rev=499583&r1=499582&r2=499583 ============================================================================== --- incubator/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQSession.cpp (original) +++ incubator/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQSession.cpp Wed Jan 24 13:54:58 2007 @@ -105,8 +105,7 @@ } // Destroy this sessions resources - connection->getConnectionData()-> - getConnector()->destroyResource( sessionInfo ); + connection->removeSession( this ); sessionInfo = NULL; // Now indicate that this session is closed. Modified: incubator/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQSession.h URL: http://svn.apache.org/viewvc/incubator/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQSession.h?view=diff&rev=499583&r1=499582&r2=499583 ============================================================================== --- incubator/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQSession.h (original) +++ incubator/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQSession.h Wed Jan 24 13:54:58 2007 @@ -74,8 +74,9 @@ public: // Implements Mehtods /** - * Closes the Session - * @throw CMSException + * Closes this session as well as any active child consumers or + * producers. + * @throws CMSException */ virtual void close() throw ( cms::CMSException ); Modified: incubator/activemq/activemq-cpp/trunk/activemq-cpp/src/main/cms/Connection.h URL: http://svn.apache.org/viewvc/incubator/activemq/activemq-cpp/trunk/activemq-cpp/src/main/cms/Connection.h?view=diff&rev=499583&r1=499582&r2=499583 ============================================================================== --- incubator/activemq/activemq-cpp/trunk/activemq-cpp/src/main/cms/Connection.h (original) +++ incubator/activemq/activemq-cpp/trunk/activemq-cpp/src/main/cms/Connection.h Wed Jan 24 13:54:58 2007 @@ -37,10 +37,18 @@ virtual ~Connection(void) {} /** + * Closes this connection as well as any Sessions + * created from it (and those Sessions' consumers and + * producers). + * @throws CMSException + */ + virtual void close() throw( CMSException ) = 0; + + /** * Creates a new Session to work for this Connection * @throws CMSException */ - virtual Session* createSession(void) throw ( CMSException ) = 0; + virtual Session* createSession() throw ( CMSException ) = 0; /** * Creates a new Session to work for this Connection using the @@ -55,13 +63,13 @@ * Get the Client Id for this session * @return Client Id String */ - virtual std::string getClientId(void) const = 0; + virtual std::string getClientId() const = 0; /** * Gets the registered Exception Listener for this connection * @return pointer to an exception listnener or NULL */ - virtual ExceptionListener* getExceptionListener(void) const = 0; + virtual ExceptionListener* getExceptionListener() const = 0; /** * Sets the registed Exception Listener for this connection Modified: incubator/activemq/activemq-cpp/trunk/activemq-cpp/src/main/cms/Session.h URL: http://svn.apache.org/viewvc/incubator/activemq/activemq-cpp/trunk/activemq-cpp/src/main/cms/Session.h?view=diff&rev=499583&r1=499582&r2=499583 ============================================================================== --- incubator/activemq/activemq-cpp/trunk/activemq-cpp/src/main/cms/Session.h (original) +++ incubator/activemq/activemq-cpp/trunk/activemq-cpp/src/main/cms/Session.h Wed Jan 24 13:54:58 2007 @@ -77,6 +77,13 @@ virtual ~Session(void) {} /** + * Closes this session as well as any active child consumers or + * producers. + * @throws CMSException + */ + virtual void close() throw( CMSException ) = 0; + + /** * Commits all messages done in this transaction and releases any * locks currently held. * @throws CMSException