Author: tabish
Date: Sat Aug 23 10:28:18 2008
New Revision: 688378
URL: http://svn.apache.org/viewvc?rev=688378&view=rev
Log:
https://issues.apache.org/activemq/browse/AMQCPP-189
Adding a max pending message limit property to this AsyncSendTransport so that
it can be configured not to fill all available memory if the broker stalls.
Modified:
activemq/activemq-cpp/trunk/src/main/activemq/transport/filters/AsyncSendTransport.cpp
activemq/activemq-cpp/trunk/src/main/activemq/transport/filters/AsyncSendTransport.h
activemq/activemq-cpp/trunk/src/main/activemq/transport/filters/AsyncSendTransportFactory.cpp
Modified:
activemq/activemq-cpp/trunk/src/main/activemq/transport/filters/AsyncSendTransport.cpp
URL:
http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/src/main/activemq/transport/filters/AsyncSendTransport.cpp?rev=688378&r1=688377&r2=688378&view=diff
==============================================================================
---
activemq/activemq-cpp/trunk/src/main/activemq/transport/filters/AsyncSendTransport.cpp
(original)
+++
activemq/activemq-cpp/trunk/src/main/activemq/transport/filters/AsyncSendTransport.cpp
Sat Aug 23 10:28:18 2008
@@ -33,6 +33,17 @@
this->closed = true;
this->asyncThread = NULL;
+ this->maxBacklog = 0;
+}
+
+////////////////////////////////////////////////////////////////////////////////
+AsyncSendTransport::AsyncSendTransport( Transport* next, unsigned int
maxBacklog, bool own )
+ : TransportFilter( next, own ) {
+
+ std::cout << "Async Transport using max Backlog of :" << maxBacklog <<
std::endl;
+ this->closed = true;
+ this->asyncThread = NULL;
+ this->maxBacklog = maxBacklog;
}
////////////////////////////////////////////////////////////////////////////////
@@ -55,6 +66,12 @@
// in case the client deletes their copy before we get a chance to
// send it.
synchronized( &msgQueue ) {
+
+ while( msgQueue.size() >= this->maxBacklog ) {
+ std::cout << "Max Backlog reached" << std::endl;
+ msgQueue.wait();
+ }
+
msgQueue.push( command->cloneCommand() );
msgQueue.notifyAll();
}
@@ -130,6 +147,10 @@
// get the data
command = msgQueue.pop();
+
+ // Notify the callers that we now have room for at least one
more
+ // message to send.
+ msgQueue.notifyAll();
}
// Dispatch the message
Modified:
activemq/activemq-cpp/trunk/src/main/activemq/transport/filters/AsyncSendTransport.h
URL:
http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/src/main/activemq/transport/filters/AsyncSendTransport.h?rev=688378&r1=688377&r2=688378&view=diff
==============================================================================
---
activemq/activemq-cpp/trunk/src/main/activemq/transport/filters/AsyncSendTransport.h
(original)
+++
activemq/activemq-cpp/trunk/src/main/activemq/transport/filters/AsyncSendTransport.h
Sat Aug 23 10:28:18 2008
@@ -32,6 +32,14 @@
private:
/**
+ * Max pending out-bound messages, this limits the number of
+ * messages that will accumulate if the broker has blocked us or
+ * slowed its reads of our out-bound messages. Default is zero
+ * or unlimited backlog.
+ */
+ unsigned int maxBacklog;
+
+ /**
* Thread to send messages in when oneway is called.
*/
decaf::lang::Thread* asyncThread;
@@ -55,6 +63,14 @@
*/
AsyncSendTransport( Transport* next, bool own = true );
+ /**
+ * Constructor.
+ * @param next - the next Transport in the chain
+ * @param maxBacklog - the max number of pending messages to store.
+ * @param own - true if this filter owns the next and should delete it
+ */
+ AsyncSendTransport( Transport* next, unsigned int maxBacklog, bool own
= true );
+
virtual ~AsyncSendTransport();
/**
Modified:
activemq/activemq-cpp/trunk/src/main/activemq/transport/filters/AsyncSendTransportFactory.cpp
URL:
http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/src/main/activemq/transport/filters/AsyncSendTransportFactory.cpp?rev=688378&r1=688377&r2=688378&view=diff
==============================================================================
---
activemq/activemq-cpp/trunk/src/main/activemq/transport/filters/AsyncSendTransportFactory.cpp
(original)
+++
activemq/activemq-cpp/trunk/src/main/activemq/transport/filters/AsyncSendTransportFactory.cpp
Sat Aug 23 10:28:18 2008
@@ -18,11 +18,14 @@
#include "AsyncSendTransportFactory.h"
#include <activemq/transport/filters/AsyncSendTransport.h>
+#include <decaf/lang/Integer.h>
using namespace activemq;
using namespace activemq::transport;
using namespace activemq::transport::filters;
using namespace activemq::exceptions;
+using namespace decaf;
+using namespace decaf::lang;
////////////////////////////////////////////////////////////////////////////////
TransportFactory& AsyncSendTransportFactory::getInstance() {
@@ -42,7 +45,11 @@
bool own ) throw ( ActiveMQException ) {
try{
- return new AsyncSendTransport( next, own );
+
+ unsigned int maxBacklog = Integer::parseInt(
+ properties.getProperty( "transport.maxAsyncSendBacklog", "0" ) );
+
+ return new AsyncSendTransport( next, maxBacklog, own );
}
AMQ_CATCH_RETHROW( ActiveMQException )
AMQ_CATCH_EXCEPTION_CONVERT( decaf::lang::Exception, ActiveMQException )