Repository: activemq-cpp Updated Branches: refs/heads/trunk 6b9bd99a1 -> b8a98a3c0
https://issues.apache.org/jira/browse/AMQCPP-543 Implemented this based on the provided patch with some tweaks. Project: http://git-wip-us.apache.org/repos/asf/activemq-cpp/repo Commit: http://git-wip-us.apache.org/repos/asf/activemq-cpp/commit/b8a98a3c Tree: http://git-wip-us.apache.org/repos/asf/activemq-cpp/tree/b8a98a3c Diff: http://git-wip-us.apache.org/repos/asf/activemq-cpp/diff/b8a98a3c Branch: refs/heads/trunk Commit: b8a98a3c0d8887045ed717c0bbb1c132f383eef5 Parents: 6b9bd99 Author: Timothy Bish <[email protected]> Authored: Thu Aug 14 18:09:46 2014 -0400 Committer: Timothy Bish <[email protected]> Committed: Thu Aug 14 18:09:46 2014 -0400 ---------------------------------------------------------------------- .../src/main/activemq/core/ActiveMQConnection.cpp | 17 ++++++++++++++++- .../src/main/activemq/core/ActiveMQConnection.h | 8 ++++++++ .../core/kernels/ActiveMQProducerKernel.cpp | 7 +++++-- .../activemq/wireformat/openwire/OpenWireFormat.h | 2 +- 4 files changed, 30 insertions(+), 4 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/activemq-cpp/blob/b8a98a3c/activemq-cpp/src/main/activemq/core/ActiveMQConnection.cpp ---------------------------------------------------------------------- diff --git a/activemq-cpp/src/main/activemq/core/ActiveMQConnection.cpp b/activemq-cpp/src/main/activemq/core/ActiveMQConnection.cpp index ce30892..7cd9061 100644 --- a/activemq-cpp/src/main/activemq/core/ActiveMQConnection.cpp +++ b/activemq-cpp/src/main/activemq/core/ActiveMQConnection.cpp @@ -37,6 +37,7 @@ #include <activemq/util/IdGenerator.h> #include <activemq/transport/failover/FailoverTransport.h> #include <activemq/transport/ResponseCallback.h> +#include <activemq/wireformat/openwire/OpenWireFormat.h> #include <decaf/lang/Math.h> #include <decaf/lang/Boolean.h> @@ -83,6 +84,7 @@ using namespace activemq::exceptions; using namespace activemq::threads; using namespace activemq::transport; using namespace activemq::transport::failover; +using namespace activemq::wireformat::openwire; using namespace decaf; using namespace decaf::io; using namespace decaf::util; @@ -199,6 +201,7 @@ namespace core { Pointer<commands::BrokerInfo> brokerInfo; Pointer<commands::WireFormatInfo> brokerWireFormatInfo; Pointer<AtomicInteger> transportInterruptionProcessingComplete; + Pointer<AtomicInteger> protocolVersion; Pointer<CountDownLatch> brokerInfoReceived; Pointer<AdvisoryConsumer> advisoryConsumer; @@ -286,6 +289,7 @@ namespace core { connectionId->setValue(uniqueId); this->transportInterruptionProcessingComplete.reset(new AtomicInteger()); + this->protocolVersion.reset(new AtomicInteger(OpenWireFormat::MAX_SUPPORTED_VERSION)); this->executor.reset( new ThreadPoolExecutor(1, 1, 5, TimeUnit::SECONDS, new LinkedBlockingQueue<Runnable*>(), @@ -1083,7 +1087,7 @@ void ActiveMQConnection::onCommand(const Pointer<Command> command) { } } else if (command->isWireFormatInfo()) { - this->config->brokerWireFormatInfo = command.dynamicCast<WireFormatInfo>(); + this->onWireFormatInfo(command); } else if (command->isBrokerInfo()) { this->config->brokerInfo = command.dynamicCast<BrokerInfo>(); this->config->brokerInfoReceived->countDown(); @@ -1116,6 +1120,12 @@ void ActiveMQConnection::onCommand(const Pointer<Command> command) { } //////////////////////////////////////////////////////////////////////////////// +void ActiveMQConnection::onWireFormatInfo(Pointer<commands::Command> command AMQCPP_UNUSED) { + this->config->brokerWireFormatInfo = command.dynamicCast<WireFormatInfo>(); + this->config->protocolVersion->set(this->config->brokerWireFormatInfo->getVersion()); +} + +//////////////////////////////////////////////////////////////////////////////// void ActiveMQConnection::onControlCommand(Pointer<commands::Command> command AMQCPP_UNUSED) { // Don't need to do anything yet as close and shutdown are applicable yet. } @@ -1909,3 +1919,8 @@ bool ActiveMQConnection::isAlwaysSessionAsync() const { void ActiveMQConnection::setAlwaysSessionAsync(bool alwaysSessionAsync) { this->config->alwaysSessionAsync = alwaysSessionAsync; } + +//////////////////////////////////////////////////////////////////////////////// +int ActiveMQConnection::getProtocolVersion() const { + return this->config->protocolVersion->get(); +} http://git-wip-us.apache.org/repos/asf/activemq-cpp/blob/b8a98a3c/activemq-cpp/src/main/activemq/core/ActiveMQConnection.h ---------------------------------------------------------------------- diff --git a/activemq-cpp/src/main/activemq/core/ActiveMQConnection.h b/activemq-cpp/src/main/activemq/core/ActiveMQConnection.h index 08e5d24..12fd1d4 100644 --- a/activemq-cpp/src/main/activemq/core/ActiveMQConnection.h +++ b/activemq-cpp/src/main/activemq/core/ActiveMQConnection.h @@ -799,6 +799,11 @@ namespace core { */ void setAlwaysSessionAsync(bool alwaysSessionAsync); + /** + * @returns the current connection's OpenWire protocol version. + */ + int getProtocolVersion() const; + public: // TransportListener /** @@ -1066,6 +1071,9 @@ namespace core { // Allow subclasses to access the original Properties object for this connection. const decaf::util::Properties& getProperties() const; + // Process the WireFormatInfo command + void onWireFormatInfo(Pointer<commands::Command> command); + // Process the ControlCommand command void onControlCommand(Pointer<commands::Command> command); http://git-wip-us.apache.org/repos/asf/activemq-cpp/blob/b8a98a3c/activemq-cpp/src/main/activemq/core/kernels/ActiveMQProducerKernel.cpp ---------------------------------------------------------------------- diff --git a/activemq-cpp/src/main/activemq/core/kernels/ActiveMQProducerKernel.cpp b/activemq-cpp/src/main/activemq/core/kernels/ActiveMQProducerKernel.cpp index 902f302..848a026 100644 --- a/activemq-cpp/src/main/activemq/core/kernels/ActiveMQProducerKernel.cpp +++ b/activemq-cpp/src/main/activemq/core/kernels/ActiveMQProducerKernel.cpp @@ -81,8 +81,11 @@ ActiveMQProducerKernel::ActiveMQProducerKernel(ActiveMQSessionKernel* session, this->destination = destination.dynamicCast<cms::Destination>(); } - // TODO - Check for need of MemoryUsage if there's a producer Windows size - // and the Protocol version is greater than 3. + // Enable producer window flow control if protocol >= 3 and the window + // size > 0 + if (session->getConnection()->getProtocolVersion() >= 3 && session->getConnection()->getProducerWindowSize() > 0) { + this->memoryUsage.reset(new MemoryUsage(session->getConnection()->getProducerWindowSize())); + } } //////////////////////////////////////////////////////////////////////////////// http://git-wip-us.apache.org/repos/asf/activemq-cpp/blob/b8a98a3c/activemq-cpp/src/main/activemq/wireformat/openwire/OpenWireFormat.h ---------------------------------------------------------------------- diff --git a/activemq-cpp/src/main/activemq/wireformat/openwire/OpenWireFormat.h b/activemq-cpp/src/main/activemq/wireformat/openwire/OpenWireFormat.h index 8ad0847..e9e76bd 100644 --- a/activemq-cpp/src/main/activemq/wireformat/openwire/OpenWireFormat.h +++ b/activemq-cpp/src/main/activemq/wireformat/openwire/OpenWireFormat.h @@ -41,7 +41,7 @@ namespace marshal { using decaf::lang::Pointer; class AMQCPP_API OpenWireFormat : public wireformat::WireFormat { - protected: + public: // Declared here to make life easier. static const unsigned char NULL_TYPE;
