Author: tabish
Date: Fri Mar 22 19:48:18 2013
New Revision: 1459956
URL: http://svn.apache.org/r1459956
Log:
fix for: https://issues.apache.org/jira/browse/AMQCPP-470
Modified:
activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/state/ConnectionStateTracker.cpp
activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/state/ConnectionStateTracker.h
activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/failover/FailoverTransport.cpp
activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/failover/FailoverTransport.h
activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/failover/FailoverTransportFactory.cpp
activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/util/LRUCache.h
activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/util/LinkedHashMap.h
activemq/activemq-cpp/trunk/activemq-cpp/src/test/activemq/state/ConnectionStateTrackerTest.cpp
activemq/activemq-cpp/trunk/activemq-cpp/src/test/activemq/state/ConnectionStateTrackerTest.h
Modified:
activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/state/ConnectionStateTracker.cpp
URL:
http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/state/ConnectionStateTracker.cpp?rev=1459956&r1=1459955&r2=1459956&view=diff
==============================================================================
---
activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/state/ConnectionStateTracker.cpp
(original)
+++
activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/state/ConnectionStateTracker.cpp
Fri Mar 22 19:48:18 2013
@@ -18,7 +18,11 @@
#include "ConnectionStateTracker.h"
#include <decaf/lang/Runnable.h>
+#include <decaf/util/HashCode.h>
+#include <decaf/util/LinkedHashMap.h>
+#include <decaf/util/MapEntry.h>
#include <decaf/util/NoSuchElementException.h>
+#include <decaf/util/concurrent/ConcurrentStlMap.h>
#include <activemq/commands/ConsumerControl.h>
#include <activemq/commands/ExceptionResponse.h>
@@ -38,9 +42,108 @@ using namespace decaf::io;
using namespace decaf::lang::exceptions;
////////////////////////////////////////////////////////////////////////////////
+namespace decaf {
+namespace util {
+
+ template<>
+ struct HashCode<MessageId> : public HashCodeUnaryBase<const MessageId&> {
+ int operator()(const MessageId& arg) const {
+ return decaf::util::HashCode<std::string>()(arg.toString());
+ }
+ };
+
+}}
+
+////////////////////////////////////////////////////////////////////////////////
namespace activemq {
namespace state {
+
+ class MessageCache : public LinkedHashMap<Pointer<MessageId>,
Pointer<Command>, HashCode< Pointer<MessageId> > > {
+ protected:
+
+ ConnectionStateTracker* parent;
+
+ public:
+
+ int currentCacheSize;
+
+ public:
+
+ MessageCache(ConnectionStateTracker* parent) :
+ LinkedHashMap<Pointer<MessageId>, Pointer<Command> >(),
parent(parent), currentCacheSize(0) {
+ }
+
+ virtual ~MessageCache() {}
+
+ virtual bool removeEldestEntry(const MapEntry<Pointer<MessageId>,
Pointer<Command> >& eldest) {
+ bool result = currentCacheSize > parent->getMaxMessageCacheSize();
+ if (result) {
+ Pointer<Message> message =
eldest.getValue().dynamicCast<Message>();
+ currentCacheSize -= message->getSize();
+ }
+ return result;
+ }
+ };
+
+ class MessagePullCache : public LinkedHashMap<std::string,
Pointer<Command> > {
+ protected:
+
+ ConnectionStateTracker* parent;
+
+ public:
+
+ MessagePullCache(ConnectionStateTracker* parent) :
+ LinkedHashMap<std::string, Pointer<Command> >(), parent(parent) {
+ }
+
+ virtual ~MessagePullCache() {}
+
+ virtual bool removeEldestEntry(const MapEntry<std::string,
Pointer<Command> >& eldest AMQCPP_UNUSED) {
+ return size() > parent->getMaxMessagePullCacheSize();
+ }
+ };
+
+ class StateTrackerImpl {
+ private:
+
+ StateTrackerImpl(const StateTrackerImpl&);
+ StateTrackerImpl& operator= (const StateTrackerImpl&);
+
+ public:
+
+ /** Parent ConnectionStateTracker */
+ ConnectionStateTracker* parent;
+
+ /** Creates a unique marker for this state tracker */
+ const Pointer<Tracked> TRACKED_RESPONSE_MARKER;
+
+ /** Map holding the ConnectionStates, indexed by the ConnectionId */
+ ConcurrentStlMap<Pointer<ConnectionId>, Pointer<ConnectionState>,
ConnectionId::COMPARATOR> connectionStates;
+
+ /** Store Messages if trackMessages == true */
+ MessageCache messageCache;
+
+ /** Store MessagePull commands for replay */
+ MessagePullCache messagePullCache;
+
+ StateTrackerImpl(ConnectionStateTracker * parent) : parent(parent),
+
TRACKED_RESPONSE_MARKER(new Tracked()),
+ connectionStates(),
+
messageCache(parent),
+
messagePullCache(parent) {
+ }
+
+ ~StateTrackerImpl() {
+ try {
+ connectionStates.clear();
+ messageCache.clear();
+ messagePullCache.clear();
+ }
+ AMQ_CATCHALL_NOTHROW()
+ }
+ };
+
class RemoveTransactionAction : public Runnable {
private:
@@ -62,7 +165,7 @@ namespace state {
virtual void run() {
Pointer<ConnectionId> connectionId = info->getConnectionId();
- Pointer<ConnectionState> cs =
stateTracker->connectionStates.get(connectionId);
+ Pointer<ConnectionState> cs =
stateTracker->impl->connectionStates.get(connectionId);
Pointer<TransactionState> txState =
cs->removeTransactionState(info->getTransactionId());
if (txState != NULL) {
txState->clear();
@@ -73,10 +176,7 @@ namespace state {
}}
////////////////////////////////////////////////////////////////////////////////
-ConnectionStateTracker::ConnectionStateTracker() : TRACKED_RESPONSE_MARKER(
new Tracked() ),
- connectionStates(),
- messageCache(),
- messagePullCache(),
+ConnectionStateTracker::ConnectionStateTracker() : impl(new
StateTrackerImpl(this)),
trackTransactions(false),
restoreSessions(true),
restoreConsumers(true),
@@ -84,12 +184,16 @@ ConnectionStateTracker::ConnectionStateT
restoreTransaction(true),
trackMessages(true),
trackTransactionProducers(true),
- maxCacheSize(128 * 1024),
- currentCacheSize(0) {
+ maxMessageCacheSize(128 *
1024),
+ maxMessagePullCacheSize(10)
{
}
////////////////////////////////////////////////////////////////////////////////
ConnectionStateTracker::~ConnectionStateTracker() {
+ try {
+ delete impl;
+ }
+ AMQ_CATCHALL_NOTHROW()
}
////////////////////////////////////////////////////////////////////////////////
@@ -117,13 +221,7 @@ void ConnectionStateTracker::trackBack(P
if (trackMessages && command->isMessage()) {
Pointer<Message> message = command.dynamicCast<Message>();
if (message->getTransactionId() == NULL) {
- currentCacheSize = currentCacheSize + message->getSize();
- }
- } else {
- Pointer<MessagePull> messagePull =
command.dynamicCast<MessagePull>();
- if (messagePull != NULL) {
- // just needs to be a rough estimate of size, ~4
identifiers
- currentCacheSize += 400;
+ this->impl->messageCache.currentCacheSize +=
message->getSize();
}
}
}
@@ -138,7 +236,9 @@ void ConnectionStateTracker::restore(Poi
try {
- Pointer<Iterator<Pointer<ConnectionState> > >
iterator(this->connectionStates.values().iterator());
+ Pointer<Iterator<Pointer<ConnectionState> > > iterator(
+ this->impl->connectionStates.values().iterator());
+
while (iterator->hasNext()) {
Pointer<ConnectionState> state = iterator->next();
@@ -158,12 +258,12 @@ void ConnectionStateTracker::restore(Poi
}
// Now we flush messages
- Pointer<Iterator<Pointer<Command> > >
messages(this->messageCache.values().iterator());
+ Pointer<Iterator<Pointer<Command> > >
messages(this->impl->messageCache.values().iterator());
while (messages->hasNext()) {
transport->oneway(messages->next());
}
- Pointer<Iterator<Pointer<Command> > >
messagePullIter(this->messagePullCache.values().iterator());
+ Pointer<Iterator<Pointer<Command> > >
messagePullIter(this->impl->messagePullCache.values().iterator());
while (messagePullIter->hasNext()) {
transport->oneway(messagePullIter->next());
}
@@ -261,7 +361,8 @@ void ConnectionStateTracker::doRestoreCo
try {
// Restore the session's consumers but possibly in pull only (prefetch
0 state) till recovery complete
- Pointer<ConnectionState> connectionState =
connectionStates.get(sessionState->getInfo()->getSessionId()->getParentId());
+ Pointer<ConnectionState> connectionState =
+
this->impl->connectionStates.get(sessionState->getInfo()->getSessionId()->getParentId());
bool connectionInterruptionProcessingComplete =
connectionState->isConnectionInterruptProcessingComplete();
Pointer<Iterator<Pointer<ConsumerState> > >
state(sessionState->getConsumerStates().iterator());
@@ -322,12 +423,12 @@ Pointer<Command> ConnectionStateTracker:
try {
if (info != NULL) {
- Pointer<ConnectionState> cs =
connectionStates.get(info->getConnectionId());
+ Pointer<ConnectionState> cs =
this->impl->connectionStates.get(info->getConnectionId());
if (cs != NULL && info->getDestination()->isTemporary()) {
cs->addTempDestination(Pointer<DestinationInfo>(info->cloneDataStructure()));
}
}
- return TRACKED_RESPONSE_MARKER;
+ return this->impl->TRACKED_RESPONSE_MARKER;
}
AMQ_CATCH_RETHROW(ActiveMQException)
AMQ_CATCH_EXCEPTION_CONVERT(Exception, ActiveMQException)
@@ -339,12 +440,12 @@ Pointer<Command> ConnectionStateTracker:
try {
if (info != NULL) {
- Pointer<ConnectionState> cs =
connectionStates.get(info->getConnectionId());
+ Pointer<ConnectionState> cs =
this->impl->connectionStates.get(info->getConnectionId());
if (cs != NULL && info->getDestination()->isTemporary()) {
cs->removeTempDestination(info->getDestination());
}
}
- return TRACKED_RESPONSE_MARKER;
+ return this->impl->TRACKED_RESPONSE_MARKER;
}
AMQ_CATCH_RETHROW(ActiveMQException)
AMQ_CATCH_EXCEPTION_CONVERT(Exception, ActiveMQException)
@@ -360,7 +461,7 @@ Pointer<Command> ConnectionStateTracker:
if (sessionId != NULL) {
Pointer<ConnectionId> connectionId = sessionId->getParentId();
if (connectionId != NULL) {
- Pointer<ConnectionState> cs =
connectionStates.get(connectionId);
+ Pointer<ConnectionState> cs =
this->impl->connectionStates.get(connectionId);
if (cs != NULL) {
Pointer<SessionState> ss =
cs->getSessionState(sessionId);
if (ss != NULL) {
@@ -370,7 +471,7 @@ Pointer<Command> ConnectionStateTracker:
}
}
}
- return TRACKED_RESPONSE_MARKER;
+ return this->impl->TRACKED_RESPONSE_MARKER;
}
AMQ_CATCH_RETHROW(ActiveMQException)
AMQ_CATCH_EXCEPTION_CONVERT(Exception, ActiveMQException)
@@ -386,7 +487,7 @@ Pointer<Command> ConnectionStateTracker:
if (sessionId != NULL) {
Pointer<ConnectionId> connectionId = sessionId->getParentId();
if (connectionId != NULL) {
- Pointer<ConnectionState> cs =
connectionStates.get(connectionId);
+ Pointer<ConnectionState> cs =
this->impl->connectionStates.get(connectionId);
if (cs != NULL) {
Pointer<SessionState> ss =
cs->getSessionState(sessionId);
if (ss != NULL) {
@@ -396,7 +497,7 @@ Pointer<Command> ConnectionStateTracker:
}
}
}
- return TRACKED_RESPONSE_MARKER;
+ return this->impl->TRACKED_RESPONSE_MARKER;
}
AMQ_CATCH_RETHROW(ActiveMQException)
AMQ_CATCH_EXCEPTION_CONVERT(Exception, ActiveMQException)
@@ -413,7 +514,7 @@ Pointer<Command> ConnectionStateTracker:
if (sessionId != NULL) {
Pointer<ConnectionId> connectionId = sessionId->getParentId();
if (connectionId != NULL) {
- Pointer<ConnectionState> cs =
connectionStates.get(connectionId);
+ Pointer<ConnectionState> cs =
this->impl->connectionStates.get(connectionId);
if (cs != NULL) {
Pointer<SessionState> ss =
cs->getSessionState(sessionId);
if (ss != NULL) {
@@ -423,7 +524,7 @@ Pointer<Command> ConnectionStateTracker:
}
}
}
- return TRACKED_RESPONSE_MARKER;
+ return this->impl->TRACKED_RESPONSE_MARKER;
}
AMQ_CATCH_RETHROW(ActiveMQException)
AMQ_CATCH_EXCEPTION_CONVERT(Exception, ActiveMQException)
@@ -439,7 +540,7 @@ Pointer<Command> ConnectionStateTracker:
if (sessionId != NULL) {
Pointer<ConnectionId> connectionId = sessionId->getParentId();
if (connectionId != NULL) {
- Pointer<ConnectionState> cs =
connectionStates.get(connectionId);
+ Pointer<ConnectionState> cs =
this->impl->connectionStates.get(connectionId);
if (cs != NULL) {
Pointer<SessionState> ss =
cs->getSessionState(sessionId);
if (ss != NULL) {
@@ -449,7 +550,7 @@ Pointer<Command> ConnectionStateTracker:
}
}
}
- return TRACKED_RESPONSE_MARKER;
+ return this->impl->TRACKED_RESPONSE_MARKER;
}
AMQ_CATCH_RETHROW(ActiveMQException)
AMQ_CATCH_EXCEPTION_CONVERT(Exception, ActiveMQException)
@@ -464,13 +565,13 @@ Pointer<Command> ConnectionStateTracker:
if (info != NULL) {
Pointer<ConnectionId> connectionId =
info->getSessionId()->getParentId();
if (connectionId != NULL) {
- Pointer<ConnectionState> cs =
connectionStates.get(connectionId);
+ Pointer<ConnectionState> cs =
this->impl->connectionStates.get(connectionId);
if (cs != NULL) {
cs->addSession(Pointer<SessionInfo>(info->cloneDataStructure()));
}
}
}
- return TRACKED_RESPONSE_MARKER;
+ return this->impl->TRACKED_RESPONSE_MARKER;
}
AMQ_CATCH_RETHROW(ActiveMQException)
AMQ_CATCH_EXCEPTION_CONVERT(Exception, ActiveMQException)
@@ -485,13 +586,13 @@ Pointer<Command> ConnectionStateTracker:
if (id != NULL) {
Pointer<ConnectionId> connectionId = id->getParentId();
if (connectionId != NULL) {
- Pointer<ConnectionState> cs =
connectionStates.get(connectionId);
+ Pointer<ConnectionState> cs =
this->impl->connectionStates.get(connectionId);
if (cs != NULL) {
cs->removeSession(Pointer<SessionId>(id->cloneDataStructure()));
}
}
}
- return TRACKED_RESPONSE_MARKER;
+ return this->impl->TRACKED_RESPONSE_MARKER;
}
AMQ_CATCH_RETHROW(ActiveMQException)
AMQ_CATCH_EXCEPTION_CONVERT(Exception, ActiveMQException)
@@ -504,9 +605,10 @@ Pointer<Command> ConnectionStateTracker:
try {
if (info != NULL) {
Pointer<ConnectionInfo> infoCopy(info->cloneDataStructure());
- connectionStates.put(info->getConnectionId(),
Pointer<ConnectionState>(new ConnectionState(infoCopy)));
+ this->impl->connectionStates.put(
+ info->getConnectionId(), Pointer<ConnectionState>(new
ConnectionState(infoCopy)));
}
- return TRACKED_RESPONSE_MARKER;
+ return this->impl->TRACKED_RESPONSE_MARKER;
}
AMQ_CATCH_RETHROW(ActiveMQException)
AMQ_CATCH_EXCEPTION_CONVERT(Exception, ActiveMQException)
@@ -518,10 +620,10 @@ Pointer<Command> ConnectionStateTracker:
try {
if (id != NULL) {
-
connectionStates.remove(Pointer<ConnectionId>(id->cloneDataStructure()));
+
this->impl->connectionStates.remove(Pointer<ConnectionId>(id->cloneDataStructure()));
}
- return TRACKED_RESPONSE_MARKER;
+ return this->impl->TRACKED_RESPONSE_MARKER;
}
AMQ_CATCH_RETHROW(ActiveMQException)
AMQ_CATCH_EXCEPTION_CONVERT(Exception, ActiveMQException)
@@ -539,7 +641,7 @@ Pointer<Command> ConnectionStateTracker:
Pointer<ConnectionId> connectionId =
producerId->getParentId()->getParentId();
if (connectionId != NULL) {
- Pointer<ConnectionState> cs =
connectionStates.get(connectionId);
+ Pointer<ConnectionState> cs =
this->impl->connectionStates.get(connectionId);
if (cs != NULL) {
Pointer<TransactionState> transactionState =
cs->getTransactionState(message->getTransactionId());
if (transactionState != NULL) {
@@ -554,9 +656,10 @@ Pointer<Command> ConnectionStateTracker:
}
}
}
- return TRACKED_RESPONSE_MARKER;
+ return this->impl->TRACKED_RESPONSE_MARKER;
} else if (trackMessages) {
- messageCache.put(message->getMessageId(),
Pointer<Message>(message->cloneDataStructure()));
+ this->impl->messageCache.put(
+ message->getMessageId(),
Pointer<Message>(message->cloneDataStructure()));
}
}
@@ -575,7 +678,7 @@ Pointer<Command> ConnectionStateTracker:
if (trackTransactions && info != NULL) {
Pointer<ConnectionId> connectionId = info->getConnectionId();
if (connectionId != NULL) {
- Pointer<ConnectionState> cs =
connectionStates.get(connectionId);
+ Pointer<ConnectionState> cs =
this->impl->connectionStates.get(connectionId);
if (cs != NULL) {
cs->addTransactionState(info->getTransactionId());
Pointer<TransactionState> transactionState =
cs->getTransactionState(info->getTransactionId());
@@ -583,7 +686,7 @@ Pointer<Command> ConnectionStateTracker:
}
}
- return TRACKED_RESPONSE_MARKER;
+ return this->impl->TRACKED_RESPONSE_MARKER;
}
return Pointer<Response>();
@@ -601,7 +704,7 @@ Pointer<Command> ConnectionStateTracker:
if (trackTransactions && info != NULL) {
Pointer<ConnectionId> connectionId = info->getConnectionId();
if (connectionId != NULL) {
- Pointer<ConnectionState> cs =
connectionStates.get(connectionId);
+ Pointer<ConnectionState> cs =
this->impl->connectionStates.get(connectionId);
if (cs != NULL) {
Pointer<TransactionState> transactionState =
cs->getTransactionState(info->getTransactionId());
if (transactionState != NULL) {
@@ -610,7 +713,7 @@ Pointer<Command> ConnectionStateTracker:
}
}
- return TRACKED_RESPONSE_MARKER;
+ return this->impl->TRACKED_RESPONSE_MARKER;
}
return Pointer<Response>();
@@ -628,7 +731,7 @@ Pointer<Command> ConnectionStateTracker:
if (trackTransactions && info != NULL) {
Pointer<ConnectionId> connectionId = info->getConnectionId();
if (connectionId != NULL) {
- Pointer<ConnectionState> cs =
connectionStates.get(connectionId);
+ Pointer<ConnectionState> cs =
this->impl->connectionStates.get(connectionId);
if (cs != NULL) {
Pointer<TransactionState> transactionState =
cs->getTransactionState(info->getTransactionId());
if (transactionState != NULL) {
@@ -655,7 +758,7 @@ Pointer<Command> ConnectionStateTracker:
if (trackTransactions && info != NULL) {
Pointer<ConnectionId> connectionId = info->getConnectionId();
if (connectionId != NULL) {
- Pointer<ConnectionState> cs =
connectionStates.get(connectionId);
+ Pointer<ConnectionState> cs =
this->impl->connectionStates.get(connectionId);
if (cs != NULL) {
Pointer<TransactionState> transactionState =
cs->getTransactionState(info->getTransactionId());
if (transactionState != NULL) {
@@ -682,7 +785,7 @@ Pointer<Command> ConnectionStateTracker:
if (trackTransactions && info != NULL) {
Pointer<ConnectionId> connectionId = info->getConnectionId();
if (connectionId != NULL) {
- Pointer<ConnectionState> cs =
connectionStates.get(connectionId);
+ Pointer<ConnectionState> cs =
this->impl->connectionStates.get(connectionId);
if (cs != NULL) {
Pointer<TransactionState> transactionState =
cs->getTransactionState(info->getTransactionId());
if (transactionState != NULL) {
@@ -709,7 +812,7 @@ Pointer<Command> ConnectionStateTracker:
if (trackTransactions && info != NULL) {
Pointer<ConnectionId> connectionId = info->getConnectionId();
if (connectionId != NULL) {
- Pointer<ConnectionState> cs =
connectionStates.get(connectionId);
+ Pointer<ConnectionState> cs =
this->impl->connectionStates.get(connectionId);
if (cs != NULL) {
Pointer<TransactionState> transactionState =
cs->getTransactionState(info->getTransactionId());
if (transactionState != NULL) {
@@ -718,7 +821,7 @@ Pointer<Command> ConnectionStateTracker:
}
}
- return TRACKED_RESPONSE_MARKER;
+ return this->impl->TRACKED_RESPONSE_MARKER;
}
return Pointer<Response>();
@@ -735,7 +838,7 @@ Pointer<Command> ConnectionStateTracker:
if (pull != NULL && pull->getDestination() != NULL &&
pull->getConsumerId() != NULL) {
std::string id = pull->getDestination()->toString() + "::" +
pull->getConsumerId()->toString();
- messagePullCache.put(id,
Pointer<Command>(pull->cloneDataStructure()));
+ this->impl->messagePullCache.put(id,
Pointer<Command>(pull->cloneDataStructure()));
}
return Pointer<Command>();
@@ -748,7 +851,7 @@ Pointer<Command> ConnectionStateTracker:
////////////////////////////////////////////////////////////////////////////////
void
ConnectionStateTracker::connectionInterruptProcessingComplete(transport::Transport*
transport, Pointer<ConnectionId> connectionId) {
- Pointer<ConnectionState> connectionState =
connectionStates.get(connectionId);
+ Pointer<ConnectionState> connectionState =
this->impl->connectionStates.get(connectionId);
if (connectionState != NULL) {
@@ -779,7 +882,7 @@ void ConnectionStateTracker::connectionI
////////////////////////////////////////////////////////////////////////////////
void ConnectionStateTracker::transportInterrupted() {
- Pointer<Iterator<Pointer<ConnectionState> > >
state(this->connectionStates.values().iterator());
+ Pointer<Iterator<Pointer<ConnectionState> > >
state(this->impl->connectionStates.values().iterator());
while (state->hasNext()) {
state->next()->setConnectionInterruptProcessingComplete(false);
}
Modified:
activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/state/ConnectionStateTracker.h
URL:
http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/state/ConnectionStateTracker.h?rev=1459956&r1=1459955&r2=1459956&view=diff
==============================================================================
---
activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/state/ConnectionStateTracker.h
(original)
+++
activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/state/ConnectionStateTracker.h
Fri Mar 22 19:48:18 2013
@@ -30,32 +30,18 @@
#include <activemq/state/Tracked.h>
#include <activemq/transport/Transport.h>
-#include <decaf/util/concurrent/ConcurrentStlMap.h>
#include <decaf/lang/Pointer.h>
namespace activemq {
namespace state {
class RemoveTransactionAction;
- using decaf::lang::Pointer;
- using decaf::util::concurrent::ConcurrentStlMap;
+ class StateTrackerImpl;
class AMQCPP_API ConnectionStateTracker: public CommandVisitorAdapter {
private:
- /** Creates a unique marker for this state tracker */
- const Pointer<Tracked> TRACKED_RESPONSE_MARKER;
-
- /** Map holding the ConnectionStates, indexed by the ConnectionId */
- ConcurrentStlMap<Pointer<ConnectionId>, Pointer<ConnectionState>,
ConnectionId::COMPARATOR> connectionStates;
-
- // TODO - The Map doesn't have a way to automatically remove the
eldest Entry
- // Either we need to implement something similar to
LinkedHashMap or find
- // some other way of tracking the eldest entry into the map and
removing it
- // if the cache size is exceeded.
- ConcurrentStlMap<Pointer<MessageId>, Pointer<Command>,
MessageId::COMPARATOR> messageCache;
-
- ConcurrentStlMap<std::string, Pointer<Command> > messagePullCache;
+ StateTrackerImpl* impl;
bool trackTransactions;
bool restoreSessions;
@@ -64,8 +50,8 @@ namespace state {
bool restoreTransaction;
bool trackMessages;
bool trackTransactionProducers;
- int maxCacheSize;
- int currentCacheSize;
+ int maxMessageCacheSize;
+ int maxMessagePullCacheSize;
friend class RemoveTransactionAction;
@@ -77,50 +63,50 @@ namespace state {
Pointer<Tracked> track(Pointer<Command> command);
- void trackBack(Pointer<Command> command);
+ void trackBack(decaf::lang::Pointer<Command> command);
- void restore(Pointer<transport::Transport> transport);
+ void restore(decaf::lang::Pointer<transport::Transport> transport);
void connectionInterruptProcessingComplete(
- transport::Transport* transport, Pointer<ConnectionId>
connectionId);
+ transport::Transport* transport,
decaf::lang::Pointer<ConnectionId> connectionId);
void transportInterrupted();
- virtual Pointer<Command> processDestinationInfo(DestinationInfo* info);
+ virtual decaf::lang::Pointer<Command>
processDestinationInfo(DestinationInfo* info);
- virtual Pointer<Command> processRemoveDestination(DestinationInfo*
info);
+ virtual decaf::lang::Pointer<Command>
processRemoveDestination(DestinationInfo* info);
- virtual Pointer<Command> processProducerInfo(ProducerInfo* info);
+ virtual decaf::lang::Pointer<Command>
processProducerInfo(ProducerInfo* info);
- virtual Pointer<Command> processRemoveProducer(ProducerId* id);
+ virtual decaf::lang::Pointer<Command>
processRemoveProducer(ProducerId* id);
- virtual Pointer<Command> processConsumerInfo(ConsumerInfo* info);
+ virtual decaf::lang::Pointer<Command>
processConsumerInfo(ConsumerInfo* info);
- virtual Pointer<Command> processRemoveConsumer(ConsumerId* id);
+ virtual decaf::lang::Pointer<Command>
processRemoveConsumer(ConsumerId* id);
- virtual Pointer<Command> processSessionInfo(SessionInfo* info);
+ virtual decaf::lang::Pointer<Command> processSessionInfo(SessionInfo*
info);
- virtual Pointer<Command> processRemoveSession(SessionId* id);
+ virtual decaf::lang::Pointer<Command> processRemoveSession(SessionId*
id);
- virtual Pointer<Command> processConnectionInfo(ConnectionInfo* info);
+ virtual decaf::lang::Pointer<Command>
processConnectionInfo(ConnectionInfo* info);
- virtual Pointer<Command> processRemoveConnection(ConnectionId* id);
+ virtual decaf::lang::Pointer<Command>
processRemoveConnection(ConnectionId* id);
- virtual Pointer<Command> processMessage(Message* message);
+ virtual decaf::lang::Pointer<Command> processMessage(Message* message);
- virtual Pointer<Command> processBeginTransaction(TransactionInfo*
info);
+ virtual decaf::lang::Pointer<Command>
processBeginTransaction(TransactionInfo* info);
- virtual Pointer<Command> processPrepareTransaction(TransactionInfo*
info);
+ virtual decaf::lang::Pointer<Command>
processPrepareTransaction(TransactionInfo* info);
- virtual Pointer<Command>
processCommitTransactionOnePhase(TransactionInfo* info);
+ virtual decaf::lang::Pointer<Command>
processCommitTransactionOnePhase(TransactionInfo* info);
- virtual Pointer<Command>
processCommitTransactionTwoPhase(TransactionInfo* info);
+ virtual decaf::lang::Pointer<Command>
processCommitTransactionTwoPhase(TransactionInfo* info);
- virtual Pointer<Command> processRollbackTransaction(TransactionInfo*
info);
+ virtual decaf::lang::Pointer<Command>
processRollbackTransaction(TransactionInfo* info);
- virtual Pointer<Command> processEndTransaction(TransactionInfo* info);
+ virtual decaf::lang::Pointer<Command>
processEndTransaction(TransactionInfo* info);
- virtual Pointer<Command> processMessagePull(MessagePull* pull);
+ virtual decaf::lang::Pointer<Command> processMessagePull(MessagePull*
pull);
bool isRestoreConsumers() const {
return this->restoreConsumers;
@@ -170,12 +156,20 @@ namespace state {
this->trackMessages = trackMessages;
}
- int getMaxCacheSize() const {
- return this->maxCacheSize;
+ int getMaxMessageCacheSize() const {
+ return this->maxMessageCacheSize;
+ }
+
+ void setMaxMessageCacheSize(int maxMessageCacheSize) {
+ this->maxMessageCacheSize = maxMessageCacheSize;
+ }
+
+ int getMaxMessagePullCacheSize() const {
+ return this->maxMessagePullCacheSize;
}
- void setMaxCacheSize(int maxCacheSize) {
- this->maxCacheSize = maxCacheSize;
+ void setMaxMessagePullCacheSize(int maxMessagePullCacheSize) {
+ this->maxMessagePullCacheSize = maxMessagePullCacheSize;
}
bool isTrackTransactionProducers() const {
@@ -188,15 +182,20 @@ namespace state {
private:
- void doRestoreTransactions(Pointer<transport::Transport> transport,
Pointer<ConnectionState> connectionState);
+ void doRestoreTransactions(decaf::lang::Pointer<transport::Transport>
transport,
+ decaf::lang::Pointer<ConnectionState>
connectionState);
- void doRestoreSessions(Pointer<transport::Transport> transport,
Pointer<ConnectionState> connectionState);
+ void doRestoreSessions(decaf::lang::Pointer<transport::Transport>
transport,
+ decaf::lang::Pointer<ConnectionState>
connectionState);
- void doRestoreConsumers(Pointer<transport::Transport> transport,
Pointer<SessionState> sessionState);
+ void doRestoreConsumers(decaf::lang::Pointer<transport::Transport>
transport,
+ decaf::lang::Pointer<SessionState>
sessionState);
- void doRestoreProducers(Pointer<transport::Transport> transport,
Pointer<SessionState> sessionState);
+ void doRestoreProducers(decaf::lang::Pointer<transport::Transport>
transport,
+ decaf::lang::Pointer<SessionState>
sessionState);
- void doRestoreTempDestinations(Pointer<transport::Transport>
transport, Pointer<ConnectionState> connectionState);
+ void
doRestoreTempDestinations(decaf::lang::Pointer<transport::Transport> transport,
+ decaf::lang::Pointer<ConnectionState>
connectionState);
};
Modified:
activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/failover/FailoverTransport.cpp
URL:
http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/failover/FailoverTransport.cpp?rev=1459956&r1=1459955&r2=1459956&view=diff
==============================================================================
---
activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/failover/FailoverTransport.cpp
(original)
+++
activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/failover/FailoverTransport.cpp
Fri Mar 22 19:48:18 2013
@@ -86,6 +86,7 @@ namespace failover {
bool trackMessages;
bool trackTransactionProducers;
int maxCacheSize;
+ int maxPullCacheSize;
bool connectionInterruptProcessingComplete;
bool firstConnection;
bool updateURIsSupported;
@@ -134,6 +135,7 @@ namespace failover {
trackMessages(false),
trackTransactionProducers(true),
maxCacheSize(128*1024),
+ maxPullCacheSize(10),
connectionInterruptProcessingComplete(false),
firstConnection(true),
updateURIsSupported(true),
@@ -560,7 +562,8 @@ void FailoverTransport::start() {
}
this->impl->taskRunner->start();
- stateTracker.setMaxCacheSize(this->getMaxCacheSize());
+ stateTracker.setMaxMessageCacheSize(this->getMaxCacheSize());
+
stateTracker.setMaxMessagePullCacheSize(this->getMaxPullCacheSize());
stateTracker.setTrackMessages(this->isTrackMessages());
stateTracker.setTrackTransactionProducers(this->isTrackTransactionProducers());
@@ -1256,6 +1259,16 @@ void FailoverTransport::setMaxCacheSize(
}
////////////////////////////////////////////////////////////////////////////////
+int FailoverTransport::getMaxPullCacheSize() const {
+ return this->impl->maxPullCacheSize;
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void FailoverTransport::setMaxPullCacheSize(int value) {
+ this->impl->maxPullCacheSize = value;
+}
+
+////////////////////////////////////////////////////////////////////////////////
bool FailoverTransport::isReconnectSupported() const {
return this->impl->reconnectSupported;
}
Modified:
activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/failover/FailoverTransport.h
URL:
http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/failover/FailoverTransport.h?rev=1459956&r1=1459955&r2=1459956&view=diff
==============================================================================
---
activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/failover/FailoverTransport.h
(original)
+++
activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/failover/FailoverTransport.h
Fri Mar 22 19:48:18 2013
@@ -212,6 +212,10 @@ namespace failover {
void setMaxCacheSize(int value);
+ int getMaxPullCacheSize() const;
+
+ void setMaxPullCacheSize(int value);
+
bool isReconnectSupported() const;
void setReconnectSupported(bool value);
Modified:
activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/failover/FailoverTransportFactory.cpp
URL:
http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/failover/FailoverTransportFactory.cpp?rev=1459956&r1=1459955&r2=1459956&view=diff
==============================================================================
---
activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/failover/FailoverTransportFactory.cpp
(original)
+++
activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/failover/FailoverTransportFactory.cpp
Fri Mar 22 19:48:18 2013
@@ -102,6 +102,8 @@ Pointer<Transport> FailoverTransportFact
Boolean::parseBoolean(topLvlProperties.getProperty("trackMessages", "false")));
transport->setMaxCacheSize(
Integer::parseInt(topLvlProperties.getProperty("maxCacheSize",
"131072")));
+ transport->setMaxPullCacheSize(
+ Integer::parseInt(topLvlProperties.getProperty("maxPullCacheSize",
"10")));
transport->setUpdateURIsSupported(
Boolean::parseBoolean(topLvlProperties.getProperty("updateURIsSupported",
"true")));
transport->setPriorityBackup(
Modified:
activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/util/LRUCache.h
URL:
http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/util/LRUCache.h?rev=1459956&r1=1459955&r2=1459956&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/util/LRUCache.h
(original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/util/LRUCache.h Fri
Mar 22 19:48:18 2013
@@ -123,7 +123,7 @@ namespace util {
protected:
- virtual bool removeEldestEntry(const MapEntry<K, V>& eldest) {
+ virtual bool removeEldestEntry(const MapEntry<K, V>& eldest
DECAF_UNUSED) {
if (this->size() > maxCacheSize) {
return true;
}
Modified:
activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/util/LinkedHashMap.h
URL:
http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/util/LinkedHashMap.h?rev=1459956&r1=1459955&r2=1459956&view=diff
==============================================================================
---
activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/util/LinkedHashMap.h
(original)
+++
activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/util/LinkedHashMap.h
Fri Mar 22 19:48:18 2013
@@ -678,7 +678,7 @@ namespace util {
*
* @return true if the eldest member should be removed.
*/
- virtual bool removeEldestEntry(const MapEntry<K, V>& eldest) {
+ virtual bool removeEldestEntry(const MapEntry<K, V>& eldest
DECAF_UNUSED) {
return false;
}
@@ -691,7 +691,7 @@ namespace util {
* @param eldest
* The MapEntry value that is about to be removed from the Map.
*/
- virtual void onEviction(const MapEntry<K, V>& eldest) {}
+ virtual void onEviction(const MapEntry<K, V>& eldest DECAF_UNUSED) {}
public:
Modified:
activemq/activemq-cpp/trunk/activemq-cpp/src/test/activemq/state/ConnectionStateTrackerTest.cpp
URL:
http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/test/activemq/state/ConnectionStateTrackerTest.cpp?rev=1459956&r1=1459955&r2=1459956&view=diff
==============================================================================
---
activemq/activemq-cpp/trunk/activemq-cpp/src/test/activemq/state/ConnectionStateTrackerTest.cpp
(original)
+++
activemq/activemq-cpp/trunk/activemq-cpp/src/test/activemq/state/ConnectionStateTrackerTest.cpp
Fri Mar 22 19:48:18 2013
@@ -17,56 +17,252 @@
#include "ConnectionStateTrackerTest.h"
+#include <activemq/transport/Transport.h>
+#include <activemq/wireformat/WireFormat.h>
#include <activemq/state/ConnectionStateTracker.h>
#include <activemq/state/ConsumerState.h>
#include <activemq/state/SessionState.h>
+#include <activemq/commands/ActiveMQTopic.h>
+#include <activemq/commands/Message.h>
#include <activemq/commands/ConnectionInfo.h>
#include <activemq/commands/SessionInfo.h>
+#include <activemq/commands/Message.h>
#include <decaf/lang/Pointer.h>
+#include <decaf/lang/exceptions/UnsupportedOperationException.h>
+#include <decaf/util/LinkedList.h>
using namespace std;
using namespace activemq;
using namespace activemq::state;
using namespace activemq::commands;
+using namespace activemq::transport;
+using namespace activemq::wireformat;
+using namespace decaf::util;
using namespace decaf::lang;
+using namespace decaf::lang::exceptions;
+
+////////////////////////////////////////////////////////////////////////////////
+namespace {
+
+ class TrackingTransport : public activemq::transport::Transport {
+ public:
+
+ LinkedList< Pointer<Command> > connections;
+ LinkedList< Pointer<Command> > sessions;
+ LinkedList< Pointer<Command> > producers;
+ LinkedList< Pointer<Command> > consumers;
+ LinkedList< Pointer<Command> > messages;
+ LinkedList< Pointer<Command> > messagePulls;
+
+ public:
+
+ virtual ~TrackingTransport() {}
+
+ virtual void start() {}
+
+ virtual void stop() {}
+
+ virtual void close() {}
+
+ virtual void oneway(const Pointer<Command> command) {
+ if (command->isConnectionInfo()) {
+ connections.add(command);
+ } else if (command->isSessionInfo()) {
+ sessions.add(command);
+ } else if (command->isProducerInfo()) {
+ producers.add(command);
+ } else if (command->isConsumerInfo()) {
+ consumers.add(command);
+ } else if (command->isMessage()) {
+ messages.add(command);
+ } else if (command->isMessagePull()) {
+ messagePulls.add(command);
+ }
+ }
+
+ virtual Pointer<FutureResponse> asyncRequest(const Pointer<Command>
command,
+ const
Pointer<ResponseCallback> responseCallback) {
+ throw UnsupportedOperationException();
+ }
+
+ virtual Pointer<Response> request(const Pointer<Command> command) {
+ throw UnsupportedOperationException();
+ }
+
+ virtual Pointer<Response> request(const Pointer<Command> command,
unsigned int timeout) {
+ throw UnsupportedOperationException();
+ }
+
+ virtual Pointer<wireformat::WireFormat> getWireFormat() const {
+ return Pointer<wireformat::WireFormat>();
+ }
+
+ virtual void setWireFormat(const Pointer<wireformat::WireFormat>
wireFormat) {
+ }
+
+ virtual void setTransportListener(TransportListener* listener) {
+ }
+
+ virtual TransportListener* getTransportListener() const {
+ return NULL;
+ }
+
+ virtual Transport* narrow(const std::type_info& typeId) {
+ return NULL;
+ }
+
+ virtual bool isFaultTolerant() const {
+ return false;
+ }
+
+ virtual bool isConnected() const {
+ return true;
+ }
+
+ virtual bool isClosed() const {
+ return false;
+ }
+
+ virtual bool isReconnectSupported() const {
+ return false;
+ }
+
+ virtual bool isUpdateURIsSupported() const {
+ return false;
+ }
+
+ virtual std::string getRemoteAddress() const {
+ return "";
+ }
+
+ virtual void reconnect(const decaf::net::URI& uri) {
+ }
+
+ virtual void updateURIs(bool rebalance, const
decaf::util::List<decaf::net::URI>& uris) {
+ }
+
+ };
+
+ class ConnectionData {
+ public:
+
+ Pointer<ConnectionInfo> connection;
+ Pointer<SessionInfo> session;
+ Pointer<ConsumerInfo> consumer;
+ Pointer<ProducerInfo> producer;
+
+ };
+
+ ConnectionData createConnectionState(ConnectionStateTracker& tracker) {
+
+ ConnectionData conn;
+
+ Pointer<ConnectionId> connectionId(new ConnectionId);
+ connectionId->setValue("CONNECTION");
+ conn.connection.reset(new ConnectionInfo);
+ conn.connection->setConnectionId(connectionId);
+
+ Pointer<SessionId> session_id(new SessionId);
+ session_id->setConnectionId("CONNECTION");
+ session_id->setValue(12345);
+ conn.session.reset(new SessionInfo);
+ conn.session->setSessionId(session_id);
+
+ Pointer<ConsumerId> consumer_id(new ConsumerId);
+ consumer_id->setConnectionId("CONNECTION");
+ consumer_id->setSessionId(12345);
+ consumer_id->setValue(42);
+ conn.consumer.reset(new ConsumerInfo);
+ conn.consumer->setConsumerId(consumer_id);
+
+ Pointer<ProducerId> producer_id(new ProducerId);
+ producer_id->setConnectionId("CONNECTION");
+ producer_id->setSessionId(12345);
+ producer_id->setValue(42);
+ conn.producer.reset(new ProducerInfo);
+ conn.producer->setProducerId(producer_id);
+
+ tracker.processConnectionInfo(conn.connection.get());
+ tracker.processSessionInfo(conn.session.get());
+ tracker.processConsumerInfo(conn.consumer.get());
+ tracker.processProducerInfo(conn.producer.get());
+
+ return conn;
+ }
+
+ void clearConnectionState(ConnectionStateTracker& tracker, ConnectionData&
conn) {
+ tracker.processRemoveProducer(conn.producer->getProducerId().get());
+ tracker.processRemoveConsumer(conn.consumer->getConsumerId().get());
+ tracker.processRemoveSession(conn.session->getSessionId().get());
+
tracker.processRemoveConnection(conn.connection->getConnectionId().get());
+ }
+
+}
////////////////////////////////////////////////////////////////////////////////
void ConnectionStateTrackerTest::test() {
- Pointer<ConnectionId> conn_id( new ConnectionId );
- conn_id->setValue( "CONNECTION" );
- Pointer<ConnectionInfo> conn_info( new ConnectionInfo );
- conn_info->setConnectionId( conn_id );
-
- Pointer<SessionId> session_id( new SessionId );
- session_id->setConnectionId( "CONNECTION" );
- session_id->setValue( 12345 );
- Pointer<SessionInfo> session_info( new SessionInfo );
- session_info->setSessionId( session_id );
-
- Pointer<ConsumerId> consumer_id( new ConsumerId );
- consumer_id->setConnectionId( "CONNECTION" );
- consumer_id->setSessionId( 12345 );
- consumer_id->setValue( 42 );
- Pointer<ConsumerInfo> consumer_info( new ConsumerInfo );
- consumer_info->setConsumerId( consumer_id );
-
- Pointer<ProducerId> producer_id( new ProducerId );
- producer_id->setConnectionId( "CONNECTION" );
- producer_id->setSessionId( 12345 );
- producer_id->setValue( 42 );
- Pointer<ProducerInfo> producer_info( new ProducerInfo );
- producer_info->setProducerId( producer_id );
+ ConnectionStateTracker tracker;
+ ConnectionData conn = createConnectionState(tracker);
+ clearConnectionState(tracker, conn);
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void ConnectionStateTrackerTest::testMessageCache() {
+
+ Pointer<TrackingTransport> transport(new TrackingTransport);
+ ConnectionStateTracker tracker;
+ tracker.setTrackMessages(true);
+
+ ConnectionData conn = createConnectionState(tracker);
+ int messageSize;
+ {
+ decaf::lang::Pointer<commands::MessageId> id(new
commands::MessageId());
+ id->setProducerId(conn.producer->getProducerId());
+ Pointer<Message> message(new Message);
+ messageSize = message->getSize();
+ }
+
+ tracker.setMaxMessageCacheSize(messageSize * 3);
+
+ int sequenceId = 1;
+
+ for (int i = 0; i < 100; ++i) {
+ decaf::lang::Pointer<commands::MessageId> id(new
commands::MessageId());
+ id->setProducerId(conn.producer->getProducerId());
+ id->setProducerSequenceId(sequenceId++);
+ Pointer<Message> message(new Message);
+ message->setMessageId(id);
+
+ tracker.processMessage(message.get());
+ tracker.trackBack(message);
+ }
+
+ tracker.restore(transport);
+
+ CPPUNIT_ASSERT_EQUAL_MESSAGE("Should only be three messages", 4,
transport->messages.size());
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void ConnectionStateTrackerTest::testMessagePullCache() {
+
+ Pointer<TrackingTransport> transport(new TrackingTransport);
ConnectionStateTracker tracker;
- tracker.processConnectionInfo( conn_info.get() );
- tracker.processSessionInfo( session_info.get() );
- tracker.processConsumerInfo( consumer_info.get() );
- tracker.processProducerInfo( producer_info.get() );
-
- tracker.processRemoveProducer( producer_id.get() );
- tracker.processRemoveConsumer( consumer_id.get() );
- tracker.processRemoveSession( session_id.get() );
- tracker.processRemoveConnection( conn_id.get() );
+ tracker.setTrackMessages(true);
+
+ ConnectionData conn = createConnectionState(tracker);
+
+ for (int i = 0; i < 100; ++i) {
+ Pointer<commands::MessagePull> pull(new commands::MessagePull());
+ Pointer<ActiveMQDestination> destination(new ActiveMQTopic("TEST" +
Integer::toString(i)));
+ pull->setConsumerId(conn.consumer->getConsumerId());
+ pull->setDestination(destination);
+ tracker.processMessagePull(pull.get());
+ tracker.trackBack(pull);
+ }
+
+ tracker.restore(transport);
+ CPPUNIT_ASSERT_EQUAL_MESSAGE("Should only be three message pulls", 10,
transport->messagePulls.size());
}
Modified:
activemq/activemq-cpp/trunk/activemq-cpp/src/test/activemq/state/ConnectionStateTrackerTest.h
URL:
http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/test/activemq/state/ConnectionStateTrackerTest.h?rev=1459956&r1=1459955&r2=1459956&view=diff
==============================================================================
---
activemq/activemq-cpp/trunk/activemq-cpp/src/test/activemq/state/ConnectionStateTrackerTest.h
(original)
+++
activemq/activemq-cpp/trunk/activemq-cpp/src/test/activemq/state/ConnectionStateTrackerTest.h
Fri Mar 22 19:48:18 2013
@@ -28,6 +28,8 @@ namespace state {
CPPUNIT_TEST_SUITE( ConnectionStateTrackerTest );
CPPUNIT_TEST( test );
+ CPPUNIT_TEST( testMessageCache );
+ CPPUNIT_TEST( testMessagePullCache );
CPPUNIT_TEST_SUITE_END();
public:
@@ -36,6 +38,9 @@ namespace state {
virtual ~ConnectionStateTrackerTest() {}
void test();
+ void testMessageCache();
+ void testMessagePullCache();
+
};
}}