Author: tabish
Date: Mon Oct 4 22:01:23 2010
New Revision: 1004450
URL: http://svn.apache.org/viewvc?rev=1004450&view=rev
Log:
Add some additional error detection code to detect dropped connections quicker.
Modified:
activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQConnection.cpp
activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQConnection.h
Modified:
activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQConnection.cpp
URL:
http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQConnection.cpp?rev=1004450&r1=1004449&r2=1004450&view=diff
==============================================================================
---
activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQConnection.cpp
(original)
+++
activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQConnection.cpp
Mon Oct 4 22:01:23 2010
@@ -26,6 +26,7 @@
#include <activemq/core/policies/DefaultRedeliveryPolicy.h>
#include <activemq/exceptions/ActiveMQException.h>
#include <activemq/exceptions/BrokerException.h>
+#include <activemq/exceptions/ConnectionFailedException.h>
#include <activemq/util/CMSExceptionSupport.h>
#include <activemq/util/IdGenerator.h>
#include <activemq/transport/failover/FailoverTransport.h>
@@ -122,6 +123,8 @@ namespace core{
Pointer<commands::WireFormatInfo> brokerWireFormatInfo;
Pointer<CountDownLatch> transportInterruptionProcessingComplete;
+ Pointer<Exception> firstFailureError;
+
ConnectionConfig() : clientIDSet( false ),
isConnectionInfoSentToBroker( false ),
userSpecifiedClientID( false ),
@@ -231,7 +234,7 @@ cms::Session* ActiveMQConnection::create
try {
- checkClosed();
+ checkClosedOrFailed();
ensureConnectionInfoSent();
// Create and initialize a new SessionInfo object
@@ -398,7 +401,7 @@ void ActiveMQConnection::start() {
try{
- checkClosed();
+ checkClosedOrFailed();
ensureConnectionInfoSent();
// This starts or restarts the delivery of all incoming messages
@@ -421,7 +424,7 @@ void ActiveMQConnection::stop() {
try {
- checkClosed();
+ checkClosedOrFailed();
// Once current deliveries are done this stops the delivery of any
// new messages.
@@ -521,7 +524,7 @@ void ActiveMQConnection::destroyDestinat
__FILE__, __LINE__, "Destination passed was NULL" );
}
- checkClosed();
+ checkClosedOrFailed();
ensureConnectionInfoSent();
Pointer<DestinationInfo> command( new DestinationInfo() );
@@ -550,7 +553,7 @@ void ActiveMQConnection::destroyDestinat
__FILE__, __LINE__, "Destination passed was NULL" );
}
- checkClosed();
+ checkClosedOrFailed();
ensureConnectionInfoSent();
const ActiveMQDestination* amqDestination =
@@ -663,6 +666,7 @@ void ActiveMQConnection::onException( co
// Mark this Connection as having a Failed transport.
this->transportFailed.set( true );
+ this->config->firstFailureError.reset( ex.clone() );
// Inform the user of the error.
fire( exceptions::ActiveMQException( ex ) );
@@ -726,7 +730,7 @@ void ActiveMQConnection::transportResume
void ActiveMQConnection::oneway( Pointer<Command> command ) {
try {
- checkClosed();
+ checkClosedOrFailed();
this->config->transport->oneway( command );
}
AMQ_CATCH_EXCEPTION_CONVERT( IOException, ActiveMQException )
@@ -740,7 +744,7 @@ void ActiveMQConnection::syncRequest( Po
try {
- checkClosed();
+ checkClosedOrFailed();
Pointer<Response> response;
@@ -779,6 +783,15 @@ void ActiveMQConnection::checkClosed() c
}
////////////////////////////////////////////////////////////////////////////////
+void ActiveMQConnection::checkClosedOrFailed() const {
+
+ checkClosed();
+ if( this->transportFailed.get() == true ) {
+ throw ConnectionFailedException( *this->config->firstFailureError );
+ }
+}
+
+////////////////////////////////////////////////////////////////////////////////
void ActiveMQConnection::ensureConnectionInfoSent() {
try{
@@ -866,10 +879,6 @@ void ActiveMQConnection::waitForTranspor
if( cdl != NULL ) {
while( !closed.get() && !transportFailed.get() && cdl->getCount() > 0
) {
-
- //std::cout << "dispatch paused, waiting for outstanding dispatch
interruption processing ("
- // << Integer::toString( cdl->getCount() ) << ") to
complete.." << std::endl;
-
cdl->await( 10, TimeUnit::SECONDS );
}
@@ -882,8 +891,6 @@ void ActiveMQConnection::setTransportInt
Pointer<CountDownLatch> cdl =
this->config->transportInterruptionProcessingComplete;
if( cdl != NULL ) {
-
- //std::cout << "Set Transport interruption processing complete." <<
std::endl;
cdl->countDown();
try {
@@ -899,8 +906,6 @@ void ActiveMQConnection::signalInterrupt
if( cdl->getCount() == 0 ) {
- //std::cout << "Signaling Transport interruption processing complete."
<< std::endl;
-
this->config->transportInterruptionProcessingComplete.reset( NULL );
FailoverTransport* failoverTransport =
dynamic_cast<FailoverTransport*>( this->config->transport->narrow(
typeid( FailoverTransport ) ) );
Modified:
activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQConnection.h
URL:
http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQConnection.h?rev=1004450&r1=1004449&r2=1004450&view=diff
==============================================================================
---
activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQConnection.h
(original)
+++
activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQConnection.h
Mon Oct 4 22:01:23 2010
@@ -652,6 +652,9 @@ namespace core{
// Check for Closed State and Throw an exception if true.
void checkClosed() const;
+ // Check for Closed State and Throw an exception if true.
+ void checkClosedOrFailed() const;
+
// If its not been sent, then send the ConnectionInfo to the Broker.
void ensureConnectionInfoSent();