Author: tabish
Date: Mon Aug 3 13:54:34 2009
New Revision: 800382
URL: http://svn.apache.org/viewvc?rev=800382&view=rev
Log:
Fix for: https://issues.apache.org/activemq/browse/AMQCPP-257
Modified:
activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQConsumer.cpp
activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQProducer.cpp
activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQSession.cpp
Modified:
activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQConsumer.cpp
URL:
http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQConsumer.cpp?rev=800382&r1=800381&r2=800382&view=diff
==============================================================================
---
activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQConsumer.cpp
(original)
+++
activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQConsumer.cpp
Mon Aug 3 13:54:34 2009
@@ -26,6 +26,7 @@
#include <activemq/commands/Message.h>
#include <activemq/commands/MessageAck.h>
#include <activemq/commands/MessagePull.h>
+#include <activemq/commands/RemoveInfo.h>
#include <activemq/commands/TransactionInfo.h>
#include <activemq/commands/TransactionId.h>
#include <activemq/core/ActiveMQConnection.h>
@@ -232,10 +233,17 @@
// Stop and Wakeup all sync consumers.
unconsumedMessages.close();
- // Remove this Consumer from the Connections set of Dispatchers
and then
- // remove it from the Broker.
+ // Remove this Consumer from the Connections set of Dispatchers
this->session->disposeOf( this->consumerInfo->getConsumerId(),
lastDeliveredSequenceId );
+ // Remove at the Broker Side, consumer has been removed from the
local
+ // Session and Connection objects so if the remote call to remove
throws
+ // it is okay to propagate to the client.
+ Pointer<RemoveInfo> info( new RemoveInfo );
+ info->setObjectId( this->consumerInfo->getConsumerId() );
+ info->setLastDeliveredSequenceId( lastDeliveredSequenceId );
+ this->session->oneway( info );
+
// If we encountered an error, propagate it.
if( haveException ){
error.setMark( __FILE__, __LINE__ );
Modified:
activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQProducer.cpp
URL:
http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQProducer.cpp?rev=800382&r1=800381&r2=800382&view=diff
==============================================================================
---
activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQProducer.cpp
(original)
+++
activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQProducer.cpp
Mon Aug 3 13:54:34 2009
@@ -18,6 +18,7 @@
#include <activemq/core/ActiveMQSession.h>
#include <activemq/core/ActiveMQConnection.h>
+#include <activemq/commands/RemoveInfo.h>
#include <decaf/lang/exceptions/NullPointerException.h>
#include <decaf/lang/exceptions/InvalidStateException.h>
#include <decaf/lang/exceptions/IllegalArgumentException.h>
@@ -26,6 +27,7 @@
using namespace std;
using namespace activemq;
using namespace activemq::core;
+using namespace activemq::commands;
using namespace activemq::exceptions;
using namespace decaf::util;
using namespace decaf::lang;
@@ -74,6 +76,13 @@
this->session->disposeOf( this->producerInfo->getProducerId() );
this->closed = true;
+
+ // Remove at the Broker Side, if this fails the producer has
already
+ // been removed from the session and connection objects so its safe
+ // for an exception to be thrown.
+ Pointer<RemoveInfo> info( new RemoveInfo );
+ info->setObjectId( this->producerInfo->getProducerId() );
+ this->session->oneway( info );
}
}
AMQ_CATCH_ALL_THROW_CMSEXCEPTION()
Modified:
activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQSession.cpp
URL:
http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQSession.cpp?rev=800382&r1=800381&r2=800382&view=diff
==============================================================================
---
activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQSession.cpp
(original)
+++
activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQSession.cpp
Mon Aug 3 13:54:34 2009
@@ -154,15 +154,15 @@
}
}
+ // Remove this sessions from the connection
+ this->connection->removeSession( this );
+
// Remove this session from the Broker.
Pointer<RemoveInfo> info( new RemoveInfo() );
info->setObjectId( this->sessionInfo->getSessionId() );
info->setLastDeliveredSequenceId( this->lastDeliveredSequenceId );
this->connection->oneway( info );
- // Remove this sessions from the connector
- this->connection->removeSession( this );
-
// Now indicate that this session is closed.
closed = true;
}
@@ -1047,16 +1047,9 @@
// Remove this Id both from the Sessions Map of Consumers and
from
// the Connection.
this->connection->removeDispatcher( id );
-
- // Remove at the Broker Side.
- Pointer<RemoveInfo> info( new RemoveInfo );
- info->setObjectId( id );
- info->setLastDeliveredSequenceId( lastDeliveredSequenceId );
- this->connection->oneway( info );
+ this->consumers.remove( id );
this->lastDeliveredSequenceId =
Math::max( this->lastDeliveredSequenceId,
lastDeliveredSequenceId );
-
- this->consumers.remove( id );
}
}
}
@@ -1078,12 +1071,6 @@
if( this->producers.containsKey( id ) ) {
this->connection->removeProducer( id );
-
- // Remove at the Broker Side.
- Pointer<RemoveInfo> info( new RemoveInfo );
- info->setObjectId( id );
- this->connection->oneway( info );
-
this->producers.remove( id );
}
}