Author: tabish
Date: Sat Jan 24 20:32:13 2009
New Revision: 737433
URL: http://svn.apache.org/viewvc?rev=737433&view=rev
Log:
https://issues.apache.org/activemq/browse/AMQCPP-100
Improvements to the shutdown code.
Modified:
activemq/activemq-cpp/trunk/src/main/activemq/commands/ActiveMQTempDestination.cpp
activemq/activemq-cpp/trunk/src/main/activemq/commands/ActiveMQTempDestination.h
activemq/activemq-cpp/trunk/src/main/activemq/core/ActiveMQConnection.cpp
activemq/activemq-cpp/trunk/src/main/activemq/core/ActiveMQConnection.h
activemq/activemq-cpp/trunk/src/main/activemq/core/ActiveMQConsumer.cpp
activemq/activemq-cpp/trunk/src/main/activemq/core/ActiveMQConsumer.h
activemq/activemq-cpp/trunk/src/main/activemq/core/ActiveMQProducer.cpp
activemq/activemq-cpp/trunk/src/main/activemq/core/ActiveMQProducer.h
activemq/activemq-cpp/trunk/src/main/activemq/core/ActiveMQSession.cpp
activemq/activemq-cpp/trunk/src/main/activemq/core/ActiveMQSession.h
Modified:
activemq/activemq-cpp/trunk/src/main/activemq/commands/ActiveMQTempDestination.cpp
URL:
http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/src/main/activemq/commands/ActiveMQTempDestination.cpp?rev=737433&r1=737432&r2=737433&view=diff
==============================================================================
---
activemq/activemq-cpp/trunk/src/main/activemq/commands/ActiveMQTempDestination.cpp
(original)
+++
activemq/activemq-cpp/trunk/src/main/activemq/commands/ActiveMQTempDestination.cpp
Sat Jan 24 20:32:13 2009
@@ -17,6 +17,7 @@
#include <activemq/commands/ActiveMQTempDestination.h>
#include <activemq/exceptions/ActiveMQException.h>
+#include <activemq/core/ActiveMQConnection.h>
using namespace std;
using namespace activemq;
@@ -45,7 +46,9 @@
////////////////////////////////////////////////////////////////////////////////
void ActiveMQTempDestination::close() throw( cms::CMSException ) {
try {
- // TODO - Dispose of this Temp Dest.
+ if( this->connection != NULL ) {
+ this->connection->destroyDestination( this );
+ }
}
AMQ_CATCH_RETHROW( exceptions::ActiveMQException )
AMQ_CATCHALL_THROW( exceptions::ActiveMQException )
Modified:
activemq/activemq-cpp/trunk/src/main/activemq/commands/ActiveMQTempDestination.h
URL:
http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/src/main/activemq/commands/ActiveMQTempDestination.h?rev=737433&r1=737432&r2=737433&view=diff
==============================================================================
---
activemq/activemq-cpp/trunk/src/main/activemq/commands/ActiveMQTempDestination.h
(original)
+++
activemq/activemq-cpp/trunk/src/main/activemq/commands/ActiveMQTempDestination.h
Sat Jan 24 20:32:13 2009
@@ -30,6 +30,9 @@
#include <string>
namespace activemq{
+namespace core{
+ class ActiveMQConnection;
+}
namespace commands{
class AMQCPP_API ActiveMQTempDestination : public ActiveMQDestination,
@@ -37,10 +40,10 @@
protected:
/**
- * Connector that we call back on close to allow this resource to
+ * Connection that we call back on close to allow this resource to
* be cleaned up correctly at this end and at the Broker End.
*/
- // TODO - Add something to ask for a way to send a dispose
+ core::ActiveMQConnection* connection;
public:
@@ -99,11 +102,22 @@
/**
* Closes down this Destination resulting in a call to dispose of the
- * TempDestination resource at the Broker.
+ * TempDestination resource at the Broker. This should only be called
+ * when the user is certain that they are finished with this
destination.
+ * The TempDestination is not closed automatically on shutdown.
* throws cms::CMSException
*/
virtual void close() throw( cms::CMSException );
+ /**
+ * Sets the Parent Connection that is notified when this destination is
+ * destroyed.
+ * @param connection - The parent connection.
+ */
+ void setConnection( core::ActiveMQConnection* connection ) {
+ this->connection = connection;
+ }
+
};
}}
Modified:
activemq/activemq-cpp/trunk/src/main/activemq/core/ActiveMQConnection.cpp
URL:
http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/src/main/activemq/core/ActiveMQConnection.cpp?rev=737433&r1=737432&r2=737433&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/src/main/activemq/core/ActiveMQConnection.cpp
(original)
+++ activemq/activemq-cpp/trunk/src/main/activemq/core/ActiveMQConnection.cpp
Sat Jan 24 20:32:13 2009
@@ -373,7 +373,7 @@
}
////////////////////////////////////////////////////////////////////////////////
-void ActiveMQConnection::destroyDestination( const cms::Destination*
destination )
+void ActiveMQConnection::destroyDestination( const
commands::ActiveMQDestination* destination )
throw( decaf::lang::exceptions::NullPointerException,
decaf::lang::exceptions::IllegalStateException,
decaf::lang::exceptions::UnsupportedOperationException,
@@ -388,14 +388,11 @@
enforceConnected();
- const commands::ActiveMQDestination* amqDestination =
- dynamic_cast<const commands::ActiveMQDestination*>( destination );
-
commands::DestinationInfo command;
command.setConnectionId(
connectionInfo.getConnectionId()->cloneDataStructure() );
command.setOperationType(
ActiveMQConstants::DESTINATION_REMOVE_OPERATION );
- command.setDestination( amqDestination->cloneDataStructure() );
+ command.setDestination( destination->cloneDataStructure() );
// Send the message to the broker.
syncRequest( &command );
@@ -408,6 +405,34 @@
}
////////////////////////////////////////////////////////////////////////////////
+void ActiveMQConnection::destroyDestination( const cms::Destination*
destination )
+ throw( decaf::lang::exceptions::NullPointerException,
+ decaf::lang::exceptions::IllegalStateException,
+ decaf::lang::exceptions::UnsupportedOperationException,
+ activemq::exceptions::ActiveMQException ) {
+
+ try{
+
+ if( destination == NULL ) {
+ throw NullPointerException(
+ __FILE__, __LINE__, "Destination passed was NULL" );
+ }
+
+ enforceConnected();
+
+ const commands::ActiveMQDestination* amqDestination =
+ dynamic_cast<const commands::ActiveMQDestination*>( destination );
+
+ this->destroyDestination( amqDestination );
+ }
+ AMQ_CATCH_RETHROW( NullPointerException )
+ AMQ_CATCH_RETHROW( IllegalStateException )
+ AMQ_CATCH_RETHROW( ActiveMQException )
+ AMQ_CATCH_EXCEPTION_CONVERT( Exception, ActiveMQException )
+ AMQ_CATCHALL_THROW( ActiveMQException )
+}
+
+////////////////////////////////////////////////////////////////////////////////
void ActiveMQConnection::onCommand( transport::Command* command ) {
try{
Modified:
activemq/activemq-cpp/trunk/src/main/activemq/core/ActiveMQConnection.h
URL:
http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/src/main/activemq/core/ActiveMQConnection.h?rev=737433&r1=737432&r2=737433&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/src/main/activemq/core/ActiveMQConnection.h
(original)
+++ activemq/activemq-cpp/trunk/src/main/activemq/core/ActiveMQConnection.h Sat
Jan 24 20:32:13 2009
@@ -196,6 +196,30 @@
* @throws ActiveMQException
* If any other error occurs during the attempt to destroy the
destination.
*/
+ virtual void destroyDestination( const commands::ActiveMQDestination*
destination )
+ throw( decaf::lang::exceptions::NullPointerException,
+ decaf::lang::exceptions::IllegalStateException,
+ decaf::lang::exceptions::UnsupportedOperationException,
+ activemq::exceptions::ActiveMQException );
+
+ /**
+ * Requests that the Broker removes the given Destination. Calling
this
+ * method implies that the client is finished with the Destination and
that
+ * no other messages will be sent or received for the given
Destination. The
+ * Broker frees all resources it has associated with this Destination.
+ *
+ * @param destination
+ * The CMS Destination the Broker will be requested to remove.
+ *
+ * @throws NullPointerException
+ * If the passed Destination is Null
+ * @throws IllegalStateException
+ * If the connection is closed.
+ * @throws UnsupportedOperationException
+ * If the wire format in use does not support this operation.
+ * @throws ActiveMQException
+ * If any other error occurs during the attempt to destroy the
destination.
+ */
virtual void destroyDestination( const cms::Destination* destination )
throw( decaf::lang::exceptions::NullPointerException,
decaf::lang::exceptions::IllegalStateException,
Modified:
activemq/activemq-cpp/trunk/src/main/activemq/core/ActiveMQConsumer.cpp
URL:
http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/src/main/activemq/core/ActiveMQConsumer.cpp?rev=737433&r1=737432&r2=737433&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/src/main/activemq/core/ActiveMQConsumer.cpp
(original)
+++ activemq/activemq-cpp/trunk/src/main/activemq/core/ActiveMQConsumer.cpp Sat
Jan 24 20:32:13 2009
@@ -55,16 +55,12 @@
"ActiveMQConsumer::ActiveMQConsumer - Init with NULL Session");
}
- // Init Producer Data
+ // Initialize Producer Data
this->session = session;
this->transaction = transaction;
this->consumerInfo.reset( consumerInfo );
this->listener = NULL;
this->closed = false;
-
- // TODO - How to Detect Close
- // Listen for our resource to close
- //this->consumerInfo->addListener( this );
}
////////////////////////////////////////////////////////////////////////////////
@@ -83,33 +79,18 @@
try{
- if( !closed ) {
+ if( !this->isClosed() ) {
+
+ // Remove this Consumer from the Connections set of Dispatchers
and then
+ // remove it from the Broker.
+ this->session->disposeOf( this->getConsumerId() );
+
+ this->closed = true;
// Identifies any errors encountered during shutdown.
bool haveException = false;
ActiveMQException error;
- // TODO
- // Close the ConsumerInfo
-// if( !consumerInfo->isClosed() ) {
-// try{
-// // We don't want a callback now
-// this->consumerInfo->removeListener( this );
-// this->consumerInfo->close();
-// } catch( ActiveMQException& ex ){
-// if( !haveException ){
-// ex.setMark( __FILE__, __LINE__ );
-// error = ex;
-// haveException = true;
-// }
-// }
-// }
-
- // Remove from Broker.
- this->session->getConnection()->disposeOf(
this->consumerInfo->getConsumerId() );
-
- closed = true;
-
// Purge all the pending messages
try{
purgeMessages();
@@ -563,7 +544,7 @@
////////////////////////////////////////////////////////////////////////////////
void ActiveMQConsumer::checkClosed() throw( exceptions::ActiveMQException ) {
- if( closed ) {
+ if( this->isClosed() ) {
throw ActiveMQException(
__FILE__, __LINE__,
"ActiveMQConsumer - Consumer Already Closed" );
Modified: activemq/activemq-cpp/trunk/src/main/activemq/core/ActiveMQConsumer.h
URL:
http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/src/main/activemq/core/ActiveMQConsumer.h?rev=737433&r1=737432&r2=737433&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/src/main/activemq/core/ActiveMQConsumer.h
(original)
+++ activemq/activemq-cpp/trunk/src/main/activemq/core/ActiveMQConsumer.h Sat
Jan 24 20:32:13 2009
@@ -182,17 +182,36 @@
* Get the Consumer information for this consumer
* @return Pointer to a Consumer Info Object
*/
- virtual commands::ConsumerInfo* getConsumerInfo() {
+ commands::ConsumerInfo* getConsumerInfo() {
return consumerInfo.get();
}
+ /**
+ * Get the Consumer Id for this consumer
+ * @return Pointer to a Consumer Id Object
+ */
+ commands::ConsumerId* getConsumerId() {
+ if( this->isClosed() ) {
+ return NULL;
+ }
+
+ return consumerInfo->getConsumerId();
+ }
+
+ /**
+ * @returns if this Consumer has been closed.
+ */
+ bool isClosed() const {
+ return this->closed;
+ }
+
protected:
/**
* Purges all messages currently in the queue. This can be as a
* result of a rollback, or of the consumer being shutdown.
*/
- virtual void purgeMessages() throw (exceptions::ActiveMQException);
+ void purgeMessages() throw ( exceptions::ActiveMQException );
/**
* Used by synchronous receive methods to wait for messages to come in.
@@ -213,14 +232,14 @@
* Pre-consume processing
* @param message - the message being consumed.
*/
- virtual void beforeMessageIsConsumed( ActiveMQMessage* message );
+ void beforeMessageIsConsumed( ActiveMQMessage* message );
/**
* Post-consume processing
* @param message - the consumed message
* @param messageExpired - flag indicating if the message has expired.
*/
- virtual void afterMessageIsConsumed( ActiveMQMessage* message, bool
messageExpired );
+ void afterMessageIsConsumed( ActiveMQMessage* message, bool
messageExpired );
private:
@@ -233,7 +252,7 @@
* for a server round-trip in that instance.
* @param timeout - the time that the client is willing to wait.
*/
- virtual void sendPullRequest( long long timeout )
+ void sendPullRequest( long long timeout )
throw ( exceptions::ActiveMQException );
// Checks for the closed state and throws if so.
Modified:
activemq/activemq-cpp/trunk/src/main/activemq/core/ActiveMQProducer.cpp
URL:
http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/src/main/activemq/core/ActiveMQProducer.cpp?rev=737433&r1=737432&r2=737433&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/src/main/activemq/core/ActiveMQProducer.cpp
(original)
+++ activemq/activemq-cpp/trunk/src/main/activemq/core/ActiveMQProducer.cpp Sat
Jan 24 20:32:13 2009
@@ -54,10 +54,6 @@
this->disableMessageId = false;
this->defaultPriority = 4;
this->defaultTimeToLive = 0;
-
- // TODO - How to manage resources
- // Listen for our resource to close
- //this->producerInfo->addListener( this );
}
////////////////////////////////////////////////////////////////////////////////
@@ -74,10 +70,10 @@
try{
- if( !closed ) {
+ if( !this->isClosed() ) {
- this->session->getConnection()->disposeOf(
this->producerInfo->getProducerId() );
- closed = true;
+ this->session->disposeOf( this->producerInfo->getProducerId() );
+ this->closed = true;
}
}
AMQ_CATCH_RETHROW( ActiveMQException )
@@ -154,7 +150,7 @@
try {
- checkClosed();
+ this->checkClosed();
if( destination == NULL ) {
@@ -189,30 +185,6 @@
AMQ_CATCHALL_THROW( ActiveMQException )
}
-// TODO
-////////////////////////////////////////////////////////////////////////////////
-//void ActiveMQProducer::onConnectorResourceClosed(
-// const ConnectorResource* resource ) throw ( cms::CMSException ) {
-//
-// try{
-//
-// checkClosed();
-//
-// if( resource != producerInfo ) {
-// throw ActiveMQException(
-// __FILE__, __LINE__,
-// "ActiveMQProducer::onConnectorResourceClosed - "
-// "Unknown object passed to this callback");
-// }
-//
-// // If our producer isn't closed already, then lets close
-// this->close();
-// }
-// AMQ_CATCH_RETHROW( ActiveMQException )
-// AMQ_CATCH_EXCEPTION_CONVERT( Exception, ActiveMQException )
-// AMQ_CATCHALL_THROW( ActiveMQException )
-//}
-
////////////////////////////////////////////////////////////////////////////////
void ActiveMQProducer::onProducerAck( const commands::ProducerAck& ack ) {
Modified: activemq/activemq-cpp/trunk/src/main/activemq/core/ActiveMQProducer.h
URL:
http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/src/main/activemq/core/ActiveMQProducer.h?rev=737433&r1=737432&r2=737433&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/src/main/activemq/core/ActiveMQProducer.h
(original)
+++ activemq/activemq-cpp/trunk/src/main/activemq/core/ActiveMQProducer.h Sat
Jan 24 20:32:13 2009
@@ -243,6 +243,13 @@
public:
/**
+ * @returns true if this Producer has been closed.
+ */
+ bool isClosed() const {
+ return this->closed;
+ }
+
+ /**
* Retries this object ProducerInfo pointer
* @return ProducerInfo pointer
*/
@@ -251,6 +258,18 @@
}
/**
+ * Retries this object ProducerId or NULL if closed.
+ * @return ProducerId pointer
+ */
+ virtual commands::ProducerId* getProducerId(){
+ if( this->isClosed() ) {
+ return NULL;
+ }
+
+ return this->producerInfo->getProducerId();
+ }
+
+ /**
* Handles the work of Processing a ProducerAck Command from the
Broker.
* @param ack - The ProducerAck message received from the Broker.
*/
Modified: activemq/activemq-cpp/trunk/src/main/activemq/core/ActiveMQSession.cpp
URL:
http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/src/main/activemq/core/ActiveMQSession.cpp?rev=737433&r1=737432&r2=737433&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/src/main/activemq/core/ActiveMQSession.cpp
(original)
+++ activemq/activemq-cpp/trunk/src/main/activemq/core/ActiveMQSession.cpp Sat
Jan 24 20:32:13 2009
@@ -117,22 +117,33 @@
// Stop the dispatch executor.
stop();
- // TODO
- // Get the complete list of closeable session resources.
- // Get the complete list of closeable session resources.
-// synchronized( &closableSessionResources ) {
-//
-// Iterator<cms::Closeable*>* iter =
closableSessionResources.iterator();
-// while( iter->hasNext() ) {
-// cms::Closeable* resource = iter->next();
-// try{
-// resource->close();
-// } catch( cms::CMSException& ex ){
-// /* Absorb */
-// }
-// }
-// delete iter;
-// }
+ // Close all Consumers
+ synchronized( &this->consumers ) {
+
+ std::vector<ActiveMQConsumer*> closables =
this->consumers.getValues();
+
+ for( std::size_t i = 0; i < closables.size(); ++i ) {
+ try{
+ closables[i]->close();
+ } catch( cms::CMSException& ex ){
+ /* Absorb */
+ }
+ }
+ }
+
+ // Close all Producers
+ synchronized( &this->producers ) {
+
+ std::vector<ActiveMQProducer*> closables =
this->producers.getValues();
+
+ for( std::size_t i = 0; i < closables.size(); ++i ) {
+ try{
+ closables[i]->close();
+ } catch( cms::CMSException& ex ){
+ /* Absorb */
+ }
+ }
+ }
// TODO = Commit it first.
// Destroy the Transaction
@@ -374,6 +385,9 @@
producer->getProducerInfo()->getProducerId()->getValue(),
producer.get() );
}
+ // Add to the Connections list
+ this->connection->addProducer( producer.get() );
+
return producer.release();
}
AMQ_CATCH_RETHROW( ActiveMQException )
@@ -624,86 +638,6 @@
}
////////////////////////////////////////////////////////////////////////////////
-//void ActiveMQSession::onConnectorResourceClosed(
-// const ConnectorResource* resource ) throw ( cms::CMSException ) {
-//
-// try{
-//
-// if( closed ) {
-// throw ActiveMQException(
-// __FILE__, __LINE__,
-// "ActiveMQSession::onProducerClose - Session Already Closed");
-// }
-//
-// const ConsumerInfo* consumer =
-// dynamic_cast<const ConsumerInfo*>( resource );
-//
-// if( consumer != NULL ) {
-//
-// // If the executor thread is currently running, stop it.
-// bool wasStarted = isStarted();
-// if( wasStarted ) {
-// stop();
-// }
-//
-// // Remove the dispatcher for the Connection
-// connection->removeDispatcher( consumer );
-//
-// // Remove this consumer from the Transaction if we are transacted
-// if( transaction != NULL ) {
-// transaction->removeFromTransaction(
consumer->getConsumerId() );
-// }
-//
-// ActiveMQConsumer* obj = NULL;
-// synchronized( &consumers ) {
-//
-// if( consumers.containsKey( consumer->getConsumerId() ) ) {
-//
-// // Get the consumer reference
-// obj = consumers.getValue( consumer->getConsumerId() );
-//
-// // Remove this consumer from the map.
-// consumers.remove( consumer->getConsumerId() );
-// }
-// }
-//
-// // Clean up any resources in the executor for this consumer
-// if( obj != NULL && executor != NULL ) {
-//
-// // Purge any pending messages for this consumer.
-// vector<ActiveMQMessage*> messages =
-// executor->purgeConsumerMessages(obj);
-//
-// // Destroy the messages.
-// for( unsigned int ix=0; ix<messages.size(); ++ix ) {
-// delete messages[ix];
-// }
-// }
-//
-// // If the executor thread was previously running, start it back
-// // up.
-// if( wasStarted ) {
-// start();
-// }
-// }
-//
-// // Remove the entry from the session resource map if it's there
-// const cms::Closeable* closeable =
-// dynamic_cast<const cms::Closeable*>( resource );
-//
-// if( closeable != NULL ){
-// synchronized( &closableSessionResources ) {
-// closableSessionResources.remove(
-// const_cast<cms::Closeable*>( closeable ) );
-// }
-// }
-// }
-// AMQ_CATCH_RETHROW( ActiveMQException )
-// AMQ_CATCH_EXCEPTION_CONVERT( Exception, ActiveMQException )
-// AMQ_CATCHALL_THROW( ActiveMQException )
-//}
-
-////////////////////////////////////////////////////////////////////////////////
cms::ExceptionListener* ActiveMQSession::getExceptionListener()
{
if( connection != NULL ) {
@@ -940,9 +874,8 @@
// Send the message to the broker.
this->connection->syncRequest( &command );
- // TODO - Manage Resources
- // Now that its setup, link it to this Connector
- // tempDestination->setConnector( this );
+ // Now that its setup, link it to this Connection so it can be closed.
+ tempDestination->setConnection( this->connection );
}
AMQ_CATCH_RETHROW( activemq::exceptions::ActiveMQException )
AMQ_CATCH_EXCEPTION_CONVERT( Exception,
activemq::exceptions::ActiveMQException )
@@ -1034,3 +967,82 @@
"ActiveMQSession - Session Already Closed" );
}
}
+
+////////////////////////////////////////////////////////////////////////////////
+void ActiveMQSession::disposeOf( commands::ConsumerId* id )
+ throw ( activemq::exceptions::ActiveMQException ) {
+
+ try{
+
+ this->checkClosed();
+
+ synchronized( &this->consumers ) {
+
+ if( this->consumers.containsKey( id->getValue() ) ) {
+
+ // If the executor thread is currently running, stop it.
+ bool wasStarted = isStarted();
+ if( wasStarted ) {
+ stop();
+ }
+
+ ActiveMQConsumer* consumer = this->consumers.getValue(
id->getValue() );
+ this->connection->removeDispatcher(
consumer->getConsumerInfo() );
+ this->connection->disposeOf( id );
+
+ this->consumers.remove( id->getValue() );
+
+ //TODO
+// // Remove this consumer from the Transaction if we are
transacted
+// if( transaction != NULL ) {
+// transaction->removeFromTransaction(
consumer->getConsumerId() );
+// }
+//
+ // Clean up any resources in the executor for this consumer
+ if( this->executor.get() != NULL ) {
+
+ // Purge any pending messages for this consumer.
+ vector<ActiveMQMessage*> messages =
+ this->executor->purgeConsumerMessages( consumer );
+
+ // Destroy the messages.
+ for( unsigned int ix = 0; ix < messages.size(); ++ix ) {
+ delete messages[ix];
+ }
+ }
+
+ if( wasStarted ) {
+ start();
+ }
+ }
+ }
+ }
+ AMQ_CATCH_RETHROW( ActiveMQException )
+ AMQ_CATCH_EXCEPTION_CONVERT( Exception, ActiveMQException )
+ AMQ_CATCHALL_THROW( ActiveMQException )
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void ActiveMQSession::disposeOf( commands::ProducerId* id )
+ throw ( activemq::exceptions::ActiveMQException ) {
+
+ try{
+
+ this->checkClosed();
+
+ synchronized( &this->producers ) {
+
+ if( this->producers.containsKey( id->getValue() ) ) {
+
+ ActiveMQProducer* producer = this->producers.getValue(
id->getValue() );
+ this->connection->removeProducer( producer );
+ this->connection->disposeOf( id );
+
+ this->producers.remove( id->getValue() );
+ }
+ }
+ }
+ AMQ_CATCH_RETHROW( ActiveMQException )
+ AMQ_CATCH_EXCEPTION_CONVERT( Exception, ActiveMQException )
+ AMQ_CATCHALL_THROW( ActiveMQException )
+}
Modified: activemq/activemq-cpp/trunk/src/main/activemq/core/ActiveMQSession.h
URL:
http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/src/main/activemq/core/ActiveMQSession.h?rev=737433&r1=737432&r2=737433&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/src/main/activemq/core/ActiveMQSession.h
(original)
+++ activemq/activemq-cpp/trunk/src/main/activemq/core/ActiveMQSession.h Sat
Jan 24 20:32:13 2009
@@ -26,6 +26,8 @@
#include <activemq/commands/ActiveMQTempDestination.h>
#include <activemq/commands/SessionInfo.h>
#include <activemq/commands/ConsumerInfo.h>
+#include <activemq/commands/ConsumerId.h>
+#include <activemq/commands/ProducerId.h>
#include <activemq/commands/TransactionId.h>
#include <activemq/core/Dispatcher.h>
@@ -78,16 +80,11 @@
decaf::util::Map<long long, ActiveMQConsumer*> consumers;
/**
- * Map of consumers.
+ * Map of producers.
*/
decaf::util::Map<long long, ActiveMQProducer*> producers;
/**
- * Map of consumers.
- */
- decaf::util::Map<long long, commands::ActiveMQTempDestination*>
tempDestinations;
-
- /**
* Sends incoming messages to the registered consumers.
*/
std::auto_ptr<ActiveMQSessionExecutor> executor;
@@ -412,6 +409,22 @@
void syncRequest( transport::Command* command, unsigned int timeout =
0 )
throw ( activemq::exceptions::ActiveMQException );
+ /**
+ * Dispose of a Consumer from this session. Removes it from the
Connection
+ * and clean up any resources associated with it.
+ * @param consumerId - the Id of the Consumer to dispose.
+ */
+ void disposeOf( commands::ConsumerId* id )
+ throw ( activemq::exceptions::ActiveMQException );
+
+ /**
+ * Dispose of a Producer from this session. Removes it from the
Connection
+ * and clean up any resources associated with it.
+ * @param consumerId - the Id of the Producer to dispose.
+ */
+ void disposeOf( commands::ProducerId* id )
+ throw ( activemq::exceptions::ActiveMQException );
+
private:
// Checks for the closed state and throws if so.