Repository: activemq-cpp Updated Branches: refs/heads/master 78172b2a1 -> 3e09e6663
https://issues.apache.org/jira/browse/AMQCPP-582 Ensure that a new pull request gets sent out. Project: http://git-wip-us.apache.org/repos/asf/activemq-cpp/repo Commit: http://git-wip-us.apache.org/repos/asf/activemq-cpp/commit/3e09e666 Tree: http://git-wip-us.apache.org/repos/asf/activemq-cpp/tree/3e09e666 Diff: http://git-wip-us.apache.org/repos/asf/activemq-cpp/diff/3e09e666 Branch: refs/heads/master Commit: 3e09e6663b4d1f59cb53e57c295fb2d797632913 Parents: 78172b2 Author: Timothy Bish <[email protected]> Authored: Mon Aug 3 14:53:19 2015 -0400 Committer: Timothy Bish <[email protected]> Committed: Mon Aug 3 14:53:19 2015 -0400 ---------------------------------------------------------------------- .../src/main/activemq/core/PrefetchPolicy.cpp | 13 +++++ .../src/main/activemq/core/PrefetchPolicy.h | 8 +++ .../core/kernels/ActiveMQConsumerKernel.cpp | 2 + .../test/openwire/OpenwireSimpleTest.cpp | 55 ++++++++++++++++++++ .../activemq/test/openwire/OpenwireSimpleTest.h | 2 + 5 files changed, 80 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/activemq-cpp/blob/3e09e666/activemq-cpp/src/main/activemq/core/PrefetchPolicy.cpp ---------------------------------------------------------------------- diff --git a/activemq-cpp/src/main/activemq/core/PrefetchPolicy.cpp b/activemq-cpp/src/main/activemq/core/PrefetchPolicy.cpp index d9b0af4..115ffc6 100644 --- a/activemq-cpp/src/main/activemq/core/PrefetchPolicy.cpp +++ b/activemq-cpp/src/main/activemq/core/PrefetchPolicy.cpp @@ -33,6 +33,19 @@ PrefetchPolicy::~PrefetchPolicy() { } //////////////////////////////////////////////////////////////////////////////// +void PrefetchPolicy::setAll(int value) { + + try { + this->setDurableTopicPrefetch(value); + this->setQueueBrowserPrefetch(value); + this->setQueuePrefetch(value); + this->setTopicPrefetch(value); + } + DECAF_CATCH_RETHROW(Exception) + DECAF_CATCHALL_THROW(Exception) +} + +//////////////////////////////////////////////////////////////////////////////// void PrefetchPolicy::configure(const decaf::util::Properties& properties) { try { http://git-wip-us.apache.org/repos/asf/activemq-cpp/blob/3e09e666/activemq-cpp/src/main/activemq/core/PrefetchPolicy.h ---------------------------------------------------------------------- diff --git a/activemq-cpp/src/main/activemq/core/PrefetchPolicy.h b/activemq-cpp/src/main/activemq/core/PrefetchPolicy.h index 0b4d785..04552de 100644 --- a/activemq-cpp/src/main/activemq/core/PrefetchPolicy.h +++ b/activemq-cpp/src/main/activemq/core/PrefetchPolicy.h @@ -106,6 +106,14 @@ namespace core { virtual int getTopicPrefetch() const = 0; /** + * Sets the prefetch value on all available prefetch configuration options. + * + * @param value + * the prefetch value to set on all prefetch options. + */ + virtual void setAll(int value); + + /** * Given a requested value for a new prefetch limit, compare it against some max * prefetch value and return either the requested value or the maximum allowable * value for prefetch. http://git-wip-us.apache.org/repos/asf/activemq-cpp/blob/3e09e666/activemq-cpp/src/main/activemq/core/kernels/ActiveMQConsumerKernel.cpp ---------------------------------------------------------------------- diff --git a/activemq-cpp/src/main/activemq/core/kernels/ActiveMQConsumerKernel.cpp b/activemq-cpp/src/main/activemq/core/kernels/ActiveMQConsumerKernel.cpp index f97148e..25a08c5 100644 --- a/activemq-cpp/src/main/activemq/core/kernels/ActiveMQConsumerKernel.cpp +++ b/activemq-cpp/src/main/activemq/core/kernels/ActiveMQConsumerKernel.cpp @@ -1024,6 +1024,8 @@ decaf::lang::Pointer<MessageDispatch> ActiveMQConsumerKernel::dequeue(long long if (timeout > 0) { timeout = Math::max(deadline - System::currentTimeMillis(), 0LL); } + + sendPullRequest(timeout); } else if (internal->redeliveryExceeded(dispatch)) { internal->posionAck(dispatch, "dispatch to " + getConsumerId()->toString() + http://git-wip-us.apache.org/repos/asf/activemq-cpp/blob/3e09e666/activemq-cpp/src/test-integration/activemq/test/openwire/OpenwireSimpleTest.cpp ---------------------------------------------------------------------- diff --git a/activemq-cpp/src/test-integration/activemq/test/openwire/OpenwireSimpleTest.cpp b/activemq-cpp/src/test-integration/activemq/test/openwire/OpenwireSimpleTest.cpp index c149dc9..affdcbe 100644 --- a/activemq-cpp/src/test-integration/activemq/test/openwire/OpenwireSimpleTest.cpp +++ b/activemq-cpp/src/test-integration/activemq/test/openwire/OpenwireSimpleTest.cpp @@ -24,6 +24,7 @@ #include <activemq/exceptions/ActiveMQException.h> #include <decaf/util/UUID.h> +#include <decaf/lang/Thread.h> using namespace std; using namespace cms; @@ -34,6 +35,7 @@ using namespace activemq::test::openwire; using namespace activemq::util; using namespace activemq::exceptions; using namespace decaf; +using namespace decaf::lang; using namespace decaf::util; //////////////////////////////////////////////////////////////////////////////// @@ -455,3 +457,56 @@ void OpenwireSimpleTest::testWithZeroConsumerPrefetchAndZeroRedelivery() { amqConnection->destroyDestination(queue.get()); } + +//////////////////////////////////////////////////////////////////////////////// +void OpenwireSimpleTest::testWithZeroConsumerPrefetchWithInFlightExpiration() { + + ActiveMQConnectionFactory factory(getBrokerURL()); + auto_ptr<cms::Connection> connection(factory.createConnection()); + + ActiveMQConnection* amqConnection = dynamic_cast<ActiveMQConnection*>(connection.get()); + amqConnection->getPrefetchPolicy()->setAll(0); + + connection->start(); + + { + auto_ptr<cms::Session> session(connection->createSession(cms::Session::AUTO_ACKNOWLEDGE)); + auto_ptr<cms::Queue> queue(session->createQueue("testWithZeroConsumerPrefetchWithInFlightExpiration")); + + amqConnection->destroyDestination(queue.get()); + + auto_ptr<cms::MessageProducer> producer(session->createProducer(queue.get())); + + auto_ptr<cms::Message> expiredMessage(session->createTextMessage("Expired")); + auto_ptr<cms::Message> validMessage(session->createTextMessage("Valid")); + producer->send(expiredMessage.get(), cms::Message::DEFAULT_DELIVERY_MODE, cms::Message::DEFAULT_MSG_PRIORITY, 2000); + producer->send(validMessage.get()); + session->close(); + } + + auto_ptr<cms::Session> session(connection->createSession(cms::Session::SESSION_TRANSACTED)); + auto_ptr<cms::Queue> queue(session->createQueue("testWithZeroConsumerPrefetchWithInFlightExpiration")); + auto_ptr<cms::MessageConsumer> consumer(session->createConsumer(queue.get())); + + { + auto_ptr<cms::Message> message(consumer->receive(5000)); + CPPUNIT_ASSERT(message.get() != NULL); + TextMessage* received = dynamic_cast<TextMessage*>(message.get()); + CPPUNIT_ASSERT_EQUAL(std::string("Expired"), received->getText()); + } + + session->rollback(); + Thread::sleep(2500); + + { + auto_ptr<cms::Message> message(consumer->receive(5000)); + CPPUNIT_ASSERT(message.get() != NULL); + TextMessage* received = dynamic_cast<TextMessage*>(message.get()); + CPPUNIT_ASSERT_EQUAL(std::string("Valid"), received->getText()); + } + + session->commit(); + session->close(); + + amqConnection->destroyDestination(queue.get()); +} http://git-wip-us.apache.org/repos/asf/activemq-cpp/blob/3e09e666/activemq-cpp/src/test-integration/activemq/test/openwire/OpenwireSimpleTest.h ---------------------------------------------------------------------- diff --git a/activemq-cpp/src/test-integration/activemq/test/openwire/OpenwireSimpleTest.h b/activemq-cpp/src/test-integration/activemq/test/openwire/OpenwireSimpleTest.h index d4d623f..6e721bb 100644 --- a/activemq-cpp/src/test-integration/activemq/test/openwire/OpenwireSimpleTest.h +++ b/activemq-cpp/src/test-integration/activemq/test/openwire/OpenwireSimpleTest.h @@ -44,6 +44,7 @@ namespace openwire{ CPPUNIT_TEST( testWithZeroConsumerPrefetch2 ); CPPUNIT_TEST( testWithZeroConsumerPrefetchAndNoMessage2 ); CPPUNIT_TEST( testWithZeroConsumerPrefetchAndZeroRedelivery ); + CPPUNIT_TEST( testWithZeroConsumerPrefetchWithInFlightExpiration ); CPPUNIT_TEST( testMapMessageSendToQueue ); CPPUNIT_TEST( testMapMessageSendToTopic ); CPPUNIT_TEST( testDestroyDestination ); @@ -68,6 +69,7 @@ namespace openwire{ void testWithZeroConsumerPrefetch2(); void testWithZeroConsumerPrefetchAndNoMessage2(); void testWithZeroConsumerPrefetchAndZeroRedelivery(); + void testWithZeroConsumerPrefetchWithInFlightExpiration(); void testMapMessageSendToQueue(); void testMapMessageSendToTopic(); void tesstStreamMessage();
