Author: tabish
Date: Mon Nov 3 14:36:56 2008
New Revision: 710198
URL: http://svn.apache.org/viewvc?rev=710198&view=rev
Log:
https://issues.apache.org/activemq/browse/AMQCPP-200
Revamp allocation pattern to make any leaks from exceptions or otherwise less
likely to occur.
Modified:
activemq/activemq-cpp/trunk/src/main/activemq/connector/openwire/OpenWireConnector.cpp
activemq/activemq-cpp/trunk/src/main/activemq/connector/openwire/OpenWireConnector.h
Modified:
activemq/activemq-cpp/trunk/src/main/activemq/connector/openwire/OpenWireConnector.cpp
URL:
http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/src/main/activemq/connector/openwire/OpenWireConnector.cpp?rev=710198&r1=710197&r2=710198&view=diff
==============================================================================
---
activemq/activemq-cpp/trunk/src/main/activemq/connector/openwire/OpenWireConnector.cpp
(original)
+++
activemq/activemq-cpp/trunk/src/main/activemq/connector/openwire/OpenWireConnector.cpp
Mon Nov 3 14:36:56 2008
@@ -18,6 +18,7 @@
#include <activemq/connector/openwire/OpenWireConnector.h>
#include <typeinfo>
+#include <memory>
#include <decaf/util/concurrent/Concurrent.h>
#include <activemq/transport/Transport.h>
#include <decaf/lang/exceptions/UnsupportedOperationException.h>
@@ -222,15 +223,12 @@
connectionInfo.setConnectionId( connectionId );
// Now we ping the broker and see if we get an ack / nack
- Response* response = syncRequest( &connectionInfo );
+ syncRequest( &connectionInfo );
synchronized( &mutex ) {
// Tag us in the Connected State now.
state = CONNECTION_STATE_CONNECTED;
}
-
- // Clean up the ack
- delete response;
}
AMQ_CATCH_RETHROW( ConnectorException )
AMQ_CATCH_EXCEPTION_CONVERT( Exception, OpenWireConnectorException )
@@ -279,38 +277,23 @@
enforceConnected();
// Create and initialize a new SessionInfo object
- commands::SessionInfo* info = new commands::SessionInfo();
- commands::SessionId* sessionId = new commands::SessionId();
+ std::auto_ptr<commands::SessionInfo> info( new commands::SessionInfo()
);
+ std::auto_ptr<commands::SessionId> sessionId( new
commands::SessionId() );
sessionId->setConnectionId(
connectionInfo.getConnectionId()->getValue() );
sessionId->setValue( sessionIds.getNextSequenceId() );
- info->setSessionId( sessionId );
+ info->setSessionId( sessionId.release() );
// Create and initialize the Connector's Session Info object, this will
// cleanup the SessionInfo command when destroyed.
- OpenWireSessionInfo* session = new OpenWireSessionInfo( this );
- session->setSessionInfo( info );
+ std::auto_ptr<OpenWireSessionInfo> session( new OpenWireSessionInfo(
this ) );
+ session->setSessionInfo( info.release() );
session->setAckMode( ackMode );
- try{
-
- // Send the subscription message to the broker.
- Response* response = syncRequest( info );
-
- // The broker did not return an error - this is good.
- // Just discard the response.
- delete response;
-
- // Return the session info.
- return session;
-
- } catch( ConnectorException& ex ) {
-
- // Something bad happened - free the session info object.
- delete session;
+ // Send the subscription message to the broker.
+ syncRequest( session->getSessionInfo() );
- ex.setMark(__FILE__, __LINE__);
- throw ex;
- }
+ // Return the session info.
+ return session.release();
}
AMQ_CATCH_RETHROW( ConnectorException )
AMQ_CATCH_EXCEPTION_CONVERT( Exception, OpenWireConnectorException )
@@ -325,17 +308,12 @@
bool noLocal )
throw ( ConnectorException ) {
- OpenWireConsumerInfo* consumer = NULL;
- commands::ConsumerInfo* consumerInfo = NULL;
-
try{
enforceConnected();
- consumer = new OpenWireConsumerInfo( this );
- consumer->setSessionInfo( session );
- consumerInfo = createConsumerInfo( destination, session );
- consumer->setConsumerInfo( consumerInfo );
+ std::auto_ptr<commands::ConsumerInfo> consumerInfo(
+ createConsumerInfo( destination, session ) );
consumerInfo->setSelector( selector );
consumerInfo->setNoLocal( noLocal );
@@ -343,33 +321,22 @@
/**
* Override default options with uri-encoded parameters.
*/
- applyDestinationOptions( consumerInfo );
+ applyDestinationOptions( consumerInfo.get() );
+
+ std::auto_ptr<OpenWireConsumerInfo> consumer( new
OpenWireConsumerInfo( this ) );
+ consumer->setSessionInfo( session );
+ consumer->setConsumerInfo( consumerInfo.release() );
synchronized( &consumerInfoMap ) {
// Optimistically place the Consumer into the Map.
- consumerInfoMap.setValue(
- consumerInfo->getConsumerId()->getValue(),
- consumer );
+ consumerInfoMap.setValue( consumer->getConsumerId(),
consumer.get() );
}
- return consumer;
-
- } catch( ConnectorException& ex ) {
- delete consumer;
- delete consumerInfo;
- ex.setMark( __FILE__, __LINE__ );
- throw ex;
- } catch( Exception& ex ) {
- delete consumer;
- delete consumerInfo;
- ex.setMark( __FILE__, __LINE__ );
- throw OpenWireConnectorException( ex );
- } catch( ... ) {
- delete consumer;
- delete consumerInfo;
- throw OpenWireConnectorException( __FILE__, __LINE__,
- "caught unknown exception" );
+ return consumer.release();;
}
+ AMQ_CATCH_RETHROW( ConnectorException )
+ AMQ_CATCH_EXCEPTION_CONVERT( Exception, OpenWireConnectorException )
+ AMQ_CATCHALL_THROW( OpenWireConnectorException )
}
////////////////////////////////////////////////////////////////////////////////
@@ -380,18 +347,12 @@
const std::string& selector,
bool noLocal )
throw ( ConnectorException ) {
-
- OpenWireConsumerInfo* consumer = NULL;
- commands::ConsumerInfo* consumerInfo = NULL;
try{
enforceConnected();
- consumer = new OpenWireConsumerInfo( this );
- consumer->setSessionInfo( session );
- consumerInfo = createConsumerInfo( topic, session );
- consumer->setConsumerInfo( consumerInfo );
+ auto_ptr<commands::ConsumerInfo> consumerInfo( createConsumerInfo(
topic, session ) );
consumerInfo->setSelector( selector );
consumerInfo->setNoLocal( noLocal );
@@ -400,33 +361,22 @@
/**
* Override default options with uri-encoded parameters.
*/
- applyDestinationOptions( consumerInfo );
+ applyDestinationOptions( consumerInfo.get() );
+
+ auto_ptr<OpenWireConsumerInfo> consumer( new OpenWireConsumerInfo(
this ) );
+ consumer->setSessionInfo( session );
+ consumer->setConsumerInfo( consumerInfo.release() );
synchronized( &consumerInfoMap ) {
// Optimistically place the Consumer into the Map.
- consumerInfoMap.setValue(
- consumerInfo->getConsumerId()->getValue(),
- consumer );
+ consumerInfoMap.setValue( consumer->getConsumerId(),
consumer.get() );
}
- return consumer;
-
- } catch( ConnectorException& ex ) {
- delete consumer;
-
- ex.setMark( __FILE__, __LINE__ );
- throw ex;
- } catch( Exception& ex ) {
- delete consumer;
-
- ex.setMark( __FILE__, __LINE__ );
- throw OpenWireConnectorException( ex );
- } catch( ... ) {
- delete consumer;
-
- throw OpenWireConnectorException( __FILE__, __LINE__,
- "caught unknown exception" );
+ return consumer.release();
}
+ AMQ_CATCH_RETHROW( ConnectorException )
+ AMQ_CATCH_EXCEPTION_CONVERT( Exception, OpenWireConnectorException )
+ AMQ_CATCHALL_THROW( OpenWireConnectorException )
}
////////////////////////////////////////////////////////////////////////////////
@@ -555,20 +505,19 @@
connector::SessionInfo* session )
throw ( ConnectorException ) {
- commands::ConsumerInfo* consumerInfo = NULL;
-
try{
enforceConnected();
- consumerInfo = new commands::ConsumerInfo();
- commands::ConsumerId* consumerId = new commands::ConsumerId();
- consumerInfo->setConsumerId( consumerId );
+ std::auto_ptr<commands::ConsumerInfo> consumerInfo( new
commands::ConsumerInfo() );
+ std::auto_ptr<commands::ConsumerId> consumerId( new
commands::ConsumerId() );
consumerId->setConnectionId( session->getConnectionId() );
consumerId->setSessionId( session->getSessionId() );
consumerId->setValue( consumerIds.getNextSequenceId() );
+ consumerInfo->setConsumerId( consumerId.release() );
+
// Cast the destination to an OpenWire destination, so we can
// get all the goodies.
const commands::ActiveMQDestination* amqDestination =
@@ -581,30 +530,11 @@
consumerInfo->setDestination( amqDestination->cloneDataStructure() );
- return consumerInfo;
-
- } catch( ConnectorException& ex ) {
- delete consumerInfo;
-
- ex.setMark( __FILE__, __LINE__ );
- throw ex;
- } catch( Exception& ex ) {
- delete consumerInfo;
-
- ex.setMark( __FILE__, __LINE__ );
- throw OpenWireConnectorException( ex );
- } catch( std::exception& ex ) {
- delete consumerInfo;
-
- throw OpenWireConnectorException( __FILE__, __LINE__,
- ex.what() );
-
- } catch( ... ) {
- delete consumerInfo;
-
- throw OpenWireConnectorException( __FILE__, __LINE__,
- "caught unknown exception" );
+ return consumerInfo.release();
}
+ AMQ_CATCH_RETHROW( ConnectorException )
+ AMQ_CATCH_EXCEPTION_CONVERT( Exception, OpenWireConnectorException )
+ AMQ_CATCHALL_THROW( OpenWireConnectorException )
}
////////////////////////////////////////////////////////////////////////////////
@@ -634,11 +564,7 @@
}
// Send the message to the broker.
- Response* response = syncRequest( consumerInfo->getConsumerInfo() );
-
- // The broker did not return an error - this is good.
- // Just discard the response.
- delete response;
+ syncRequest( consumerInfo->getConsumerInfo() );
// Tag the Consumer as started
consumerInfo->setStarted( true );
@@ -654,28 +580,23 @@
connector::SessionInfo* session )
throw ( ConnectorException ) {
- OpenWireProducerInfo* producer = NULL;
- commands::ProducerInfo* producerInfo = NULL;
-
try{
enforceConnected();
- producer = new OpenWireProducerInfo( this );
+ std::auto_ptr<OpenWireProducerInfo> producer( new
OpenWireProducerInfo( this ) );
producer->setSessionInfo( session );
producer->setSendTimeout( this->getSendTimeout() );
- producerInfo = new commands::ProducerInfo();
- producer->setProducerInfo( producerInfo );
-
- commands::ProducerId* producerId = new commands::ProducerId();
- producerInfo->setProducerId( producerId );
- producerInfo->setWindowSize( this->getProducerWindowSize() );
-
+ std::auto_ptr<commands::ProducerId> producerId( new
commands::ProducerId() );
producerId->setConnectionId( session->getConnectionId() );
producerId->setSessionId( session->getSessionId() );
producerId->setValue( producerIds.getNextSequenceId() );
+ std::auto_ptr<commands::ProducerInfo> producerInfo( new
commands::ProducerInfo() );
+ producerInfo->setProducerId( producerId.release() );
+ producerInfo->setWindowSize( this->getProducerWindowSize() );
+
// Producers are allowed to have NULL destinations. In this case, the
// destination is specified by the messages as they are sent.
if( destination != NULL ) {
@@ -698,31 +619,15 @@
}
// Send the message to the broker.
- Response* response = syncRequest(producerInfo);
+ syncRequest( producerInfo.get() );
- // The broker did not return an error - this is good.
- // Just discard the response.
- delete response;
+ producer->setProducerInfo( producerInfo.release() );
- return producer;
-
- } catch( ConnectorException& ex ) {
- delete producer;
- ex.setMark( __FILE__, __LINE__ );
- throw ex;
- } catch( Exception& ex ) {
- delete producer;
- ex.setMark( __FILE__, __LINE__ );
- throw OpenWireConnectorException( ex );
- } catch( std::exception& ex ) {
- delete producer;
- throw OpenWireConnectorException( __FILE__, __LINE__,
- ex.what() );
- } catch( ... ) {
- delete producer;
- throw OpenWireConnectorException( __FILE__, __LINE__,
- "caught unknown exception" );
+ return producer.release();
}
+ AMQ_CATCH_RETHROW( ConnectorException )
+ AMQ_CATCH_EXCEPTION_CONVERT( Exception, OpenWireConnectorException )
+ AMQ_CATCHALL_THROW( OpenWireConnectorException )
}
////////////////////////////////////////////////////////////////////////////////
@@ -761,13 +666,13 @@
enforceConnected();
- commands::ActiveMQTempTopic* topic = new
- commands::ActiveMQTempTopic( createTemporaryDestinationName() );
+ std::auto_ptr<commands::ActiveMQTempTopic> topic( new
+ commands::ActiveMQTempTopic( createTemporaryDestinationName() ) );
// Register it with the Broker
- this->createTemporaryDestination( topic );
+ this->createTemporaryDestination( topic.get() );
- return topic;
+ return topic.release();
}
AMQ_CATCH_RETHROW( ConnectorException )
AMQ_CATCH_EXCEPTION_CONVERT( Exception, OpenWireConnectorException )
@@ -783,13 +688,13 @@
enforceConnected();
- commands::ActiveMQTempQueue* queue = new
- commands::ActiveMQTempQueue( createTemporaryDestinationName() );
+ std::auto_ptr<commands::ActiveMQTempQueue> queue( new
+ commands::ActiveMQTempQueue( createTemporaryDestinationName() ) );
// Register it with the Broker
- this->createTemporaryDestination( queue );
+ this->createTemporaryDestination( queue.get() );
- return queue;
+ return queue.release();
}
AMQ_CATCH_RETHROW( ConnectorException )
AMQ_CATCH_EXCEPTION_CONVERT( Exception, OpenWireConnectorException )
@@ -874,11 +779,8 @@
} else {
// Send the message to the broker.
- Response* response = syncRequest( amqMessage,
producerInfo->getSendTimeout() );
+ syncRequest( amqMessage, producerInfo->getSendTimeout() );
- // The broker did not return an error - this is good.
- // Just discard the response.
- delete response;
}
} catch( ConnectorException& ex ){
@@ -1102,27 +1004,27 @@
enforceConnected();
- OpenWireTransactionInfo* transaction =
- new OpenWireTransactionInfo( this );
+ std::auto_ptr<OpenWireTransactionInfo> transaction(
+ new OpenWireTransactionInfo( this ) );
// Place Transaction Data in session for later use as well as
// the session in the Transaction Data
- session->setTransactionInfo( transaction );
+ session->setTransactionInfo( transaction.get() );
transaction->setSessionInfo( session );
// Prepare and send the Transaction command
- commands::TransactionInfo* info = new commands::TransactionInfo();
+ std::auto_ptr<commands::TransactionInfo> info( new
commands::TransactionInfo() );
info->setConnectionId(
connectionInfo.getConnectionId()->cloneDataStructure() );
info->setTransactionId( createLocalTransactionId() );
info->setType( (int)TRANSACTION_STATE_BEGIN );
- oneway( info );
+ oneway( info.get() );
// Store for later
- transaction->setTransactionInfo( info );
+ transaction->setTransactionInfo( info.release() );
- return transaction;
+ return transaction.release();
} catch( ConnectorException& ex ){
try{ transport->close(); } catch( ... ){}
@@ -1284,40 +1186,23 @@
void OpenWireConnector::unsubscribe( const std::string& name )
throw ( ConnectorException, UnsupportedOperationException ) {
- commands::RemoveSubscriptionInfo* rsi = NULL;
-
try {
enforceConnected();
- rsi = new commands::RemoveSubscriptionInfo();
+ std::auto_ptr<commands::RemoveSubscriptionInfo> rsi(
+ new commands::RemoveSubscriptionInfo() );
+
rsi->setConnectionId(
connectionInfo.getConnectionId()->cloneDataStructure() );
rsi->setSubcriptionName( name );
rsi->setClientId( connectionInfo.getClientId() );
// Send the message to the broker.
- Response* response = syncRequest( rsi );
-
- // The broker did not return an error - this is good.
- // Just discard the response.
- delete response;
-
- } catch( ConnectorException& ex ) {
- delete rsi;
-
- ex.setMark( __FILE__, __LINE__ );
- throw ex;
- } catch( Exception& ex ) {
- delete rsi;
-
- ex.setMark( __FILE__, __LINE__ );
- throw OpenWireConnectorException( ex );
- } catch( ... ) {
- delete rsi;
-
- throw OpenWireConnectorException( __FILE__, __LINE__,
- "caught unknown exception" );
+ syncRequest( rsi.get() );
}
+ AMQ_CATCH_RETHROW( ConnectorException )
+ AMQ_CATCH_EXCEPTION_CONVERT( Exception, OpenWireConnectorException )
+ AMQ_CATCHALL_THROW( OpenWireConnectorException )
}
////////////////////////////////////////////////////////////////////////////////
@@ -1540,21 +1425,21 @@
}
////////////////////////////////////////////////////////////////////////////////
-Response* OpenWireConnector::syncRequest( Command* command, unsigned int
timeout )
+void OpenWireConnector::syncRequest( Command* command, unsigned int timeout )
throw ( ConnectorException ) {
try {
- Response* response = NULL;
+ std::auto_ptr<Response> response;
if( timeout == 0 ) {
- response = transport->request( command );
+ response.reset( transport->request( command ) );
} else {
- response = transport->request( command, timeout );
+ response.reset( transport->request( command, timeout ) );
}
commands::ExceptionResponse* exceptionResponse =
- dynamic_cast<commands::ExceptionResponse*>( response );
+ dynamic_cast<commands::ExceptionResponse*>( response.get() );
if( exceptionResponse != NULL ) {
@@ -1564,15 +1449,9 @@
exceptionResponse->getException() );
BrokerException exception( __FILE__, __LINE__, brokerError );
- // Free the response command.
- delete response;
-
// Throw the exception.
throw exception;
}
-
- // Nothing bad happened - just return the response.
- return response;
}
AMQ_CATCH_RETHROW( ConnectorException )
AMQ_CATCH_EXCEPTION_CONVERT( CommandIOException,
OpenWireConnectorException )
@@ -1603,7 +1482,7 @@
try{
commands::RemoveInfo command;
command.setObjectId( objectId->cloneDataStructure() );
- delete this->syncRequest( &command, timeout );
+ this->syncRequest( &command, timeout );
}
AMQ_CATCH_RETHROW( ConnectorException )
AMQ_CATCH_EXCEPTION_CONVERT( Exception, OpenWireConnectorException )
@@ -1623,11 +1502,7 @@
command.setDestination( tempDestination->cloneDataStructure() );
// Send the message to the broker.
- Response* response = syncRequest(&command);
-
- // The broker did not return an error - this is good.
- // Just discard the response.
- delete response;
+ syncRequest( &command );
// Now that its setup, link it to this Connector
tempDestination->setConnector( this );
@@ -1651,11 +1526,7 @@
tempDestination->cloneDataStructure() );
// Send the message to the broker.
- Response* response = syncRequest(&command);
-
- // The broker did not return an error - this is good.
- // Just discard the response.
- delete response;
+ syncRequest( &command );
}
AMQ_CATCH_RETHROW( ConnectorException )
AMQ_CATCH_EXCEPTION_CONVERT( Exception, OpenWireConnectorException )
@@ -1680,12 +1551,12 @@
throw ( ConnectorException ) {
try{
- commands::LocalTransactionId* id = new commands::LocalTransactionId();
+ std::auto_ptr<commands::LocalTransactionId> id( new
commands::LocalTransactionId() );
id->setConnectionId(
connectionInfo.getConnectionId()->cloneDataStructure() );
id->setValue( transactionIds.getNextSequenceId() );
- return id;
+ return id.release();
}
AMQ_CATCH_RETHROW( ConnectorException )
AMQ_CATCH_EXCEPTION_CONVERT( Exception, OpenWireConnectorException )
Modified:
activemq/activemq-cpp/trunk/src/main/activemq/connector/openwire/OpenWireConnector.h
URL:
http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/src/main/activemq/connector/openwire/OpenWireConnector.h?rev=710198&r1=710197&r2=710198&view=diff
==============================================================================
---
activemq/activemq-cpp/trunk/src/main/activemq/connector/openwire/OpenWireConnector.h
(original)
+++
activemq/activemq-cpp/trunk/src/main/activemq/connector/openwire/OpenWireConnector.h
Mon Nov 3 14:36:56 2008
@@ -713,11 +713,10 @@
* Converts any error responses into an exception.
* @param command The request command.
* @param timeout The time to wait for a response, default is zero or
infinite.
- * @returns The response sent from the broker.
* @throws ConnectorException thrown if an error response was received
* from the broker, or if any other error occurred.
*/
- transport::Response* syncRequest( transport::Command* command,
unsigned int timeout = 0 )
+ void syncRequest( transport::Command* command, unsigned int timeout =
0 )
throw (ConnectorException);
/**