Repository: activemq-cpp
Updated Branches:
  refs/heads/master 78172b2a1 -> 3e09e6663


https://issues.apache.org/jira/browse/AMQCPP-582

Ensure that a new pull request gets sent out.  

Project: http://git-wip-us.apache.org/repos/asf/activemq-cpp/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-cpp/commit/3e09e666
Tree: http://git-wip-us.apache.org/repos/asf/activemq-cpp/tree/3e09e666
Diff: http://git-wip-us.apache.org/repos/asf/activemq-cpp/diff/3e09e666

Branch: refs/heads/master
Commit: 3e09e6663b4d1f59cb53e57c295fb2d797632913
Parents: 78172b2
Author: Timothy Bish <[email protected]>
Authored: Mon Aug 3 14:53:19 2015 -0400
Committer: Timothy Bish <[email protected]>
Committed: Mon Aug 3 14:53:19 2015 -0400

----------------------------------------------------------------------
 .../src/main/activemq/core/PrefetchPolicy.cpp   | 13 +++++
 .../src/main/activemq/core/PrefetchPolicy.h     |  8 +++
 .../core/kernels/ActiveMQConsumerKernel.cpp     |  2 +
 .../test/openwire/OpenwireSimpleTest.cpp        | 55 ++++++++++++++++++++
 .../activemq/test/openwire/OpenwireSimpleTest.h |  2 +
 5 files changed, 80 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq-cpp/blob/3e09e666/activemq-cpp/src/main/activemq/core/PrefetchPolicy.cpp
----------------------------------------------------------------------
diff --git a/activemq-cpp/src/main/activemq/core/PrefetchPolicy.cpp 
b/activemq-cpp/src/main/activemq/core/PrefetchPolicy.cpp
index d9b0af4..115ffc6 100644
--- a/activemq-cpp/src/main/activemq/core/PrefetchPolicy.cpp
+++ b/activemq-cpp/src/main/activemq/core/PrefetchPolicy.cpp
@@ -33,6 +33,19 @@ PrefetchPolicy::~PrefetchPolicy() {
 }
 
 
////////////////////////////////////////////////////////////////////////////////
+void PrefetchPolicy::setAll(int value) {
+
+    try {
+        this->setDurableTopicPrefetch(value);
+        this->setQueueBrowserPrefetch(value);
+        this->setQueuePrefetch(value);
+        this->setTopicPrefetch(value);
+    }
+    DECAF_CATCH_RETHROW(Exception)
+    DECAF_CATCHALL_THROW(Exception)
+}
+
+////////////////////////////////////////////////////////////////////////////////
 void PrefetchPolicy::configure(const decaf::util::Properties& properties) {
 
     try {

http://git-wip-us.apache.org/repos/asf/activemq-cpp/blob/3e09e666/activemq-cpp/src/main/activemq/core/PrefetchPolicy.h
----------------------------------------------------------------------
diff --git a/activemq-cpp/src/main/activemq/core/PrefetchPolicy.h 
b/activemq-cpp/src/main/activemq/core/PrefetchPolicy.h
index 0b4d785..04552de 100644
--- a/activemq-cpp/src/main/activemq/core/PrefetchPolicy.h
+++ b/activemq-cpp/src/main/activemq/core/PrefetchPolicy.h
@@ -106,6 +106,14 @@ namespace core {
         virtual int getTopicPrefetch() const = 0;
 
         /**
+         * Sets the prefetch value on all available prefetch configuration 
options.
+         *
+         * @param value
+         *      the prefetch value to set on all prefetch options.
+         */
+        virtual void setAll(int value);
+
+        /**
          * Given a requested value for a new prefetch limit, compare it 
against some max
          * prefetch value and return either the requested value or the maximum 
allowable
          * value for prefetch.

http://git-wip-us.apache.org/repos/asf/activemq-cpp/blob/3e09e666/activemq-cpp/src/main/activemq/core/kernels/ActiveMQConsumerKernel.cpp
----------------------------------------------------------------------
diff --git 
a/activemq-cpp/src/main/activemq/core/kernels/ActiveMQConsumerKernel.cpp 
b/activemq-cpp/src/main/activemq/core/kernels/ActiveMQConsumerKernel.cpp
index f97148e..25a08c5 100644
--- a/activemq-cpp/src/main/activemq/core/kernels/ActiveMQConsumerKernel.cpp
+++ b/activemq-cpp/src/main/activemq/core/kernels/ActiveMQConsumerKernel.cpp
@@ -1024,6 +1024,8 @@ decaf::lang::Pointer<MessageDispatch> 
ActiveMQConsumerKernel::dequeue(long long
                 if (timeout > 0) {
                     timeout = Math::max(deadline - 
System::currentTimeMillis(), 0LL);
                 }
+
+                sendPullRequest(timeout);
             } else if (internal->redeliveryExceeded(dispatch)) {
                 internal->posionAck(dispatch,
                                     "dispatch to " + 
getConsumerId()->toString() +

http://git-wip-us.apache.org/repos/asf/activemq-cpp/blob/3e09e666/activemq-cpp/src/test-integration/activemq/test/openwire/OpenwireSimpleTest.cpp
----------------------------------------------------------------------
diff --git 
a/activemq-cpp/src/test-integration/activemq/test/openwire/OpenwireSimpleTest.cpp
 
b/activemq-cpp/src/test-integration/activemq/test/openwire/OpenwireSimpleTest.cpp
index c149dc9..affdcbe 100644
--- 
a/activemq-cpp/src/test-integration/activemq/test/openwire/OpenwireSimpleTest.cpp
+++ 
b/activemq-cpp/src/test-integration/activemq/test/openwire/OpenwireSimpleTest.cpp
@@ -24,6 +24,7 @@
 #include <activemq/exceptions/ActiveMQException.h>
 
 #include <decaf/util/UUID.h>
+#include <decaf/lang/Thread.h>
 
 using namespace std;
 using namespace cms;
@@ -34,6 +35,7 @@ using namespace activemq::test::openwire;
 using namespace activemq::util;
 using namespace activemq::exceptions;
 using namespace decaf;
+using namespace decaf::lang;
 using namespace decaf::util;
 
 
////////////////////////////////////////////////////////////////////////////////
@@ -455,3 +457,56 @@ void 
OpenwireSimpleTest::testWithZeroConsumerPrefetchAndZeroRedelivery() {
 
     amqConnection->destroyDestination(queue.get());
 }
+
+////////////////////////////////////////////////////////////////////////////////
+void OpenwireSimpleTest::testWithZeroConsumerPrefetchWithInFlightExpiration() {
+
+    ActiveMQConnectionFactory factory(getBrokerURL());
+    auto_ptr<cms::Connection> connection(factory.createConnection());
+
+    ActiveMQConnection* amqConnection = 
dynamic_cast<ActiveMQConnection*>(connection.get());
+    amqConnection->getPrefetchPolicy()->setAll(0);
+
+    connection->start();
+
+    {
+        auto_ptr<cms::Session> 
session(connection->createSession(cms::Session::AUTO_ACKNOWLEDGE));
+        auto_ptr<cms::Queue> 
queue(session->createQueue("testWithZeroConsumerPrefetchWithInFlightExpiration"));
+
+        amqConnection->destroyDestination(queue.get());
+
+        auto_ptr<cms::MessageProducer> 
producer(session->createProducer(queue.get()));
+
+        auto_ptr<cms::Message> 
expiredMessage(session->createTextMessage("Expired"));
+        auto_ptr<cms::Message> 
validMessage(session->createTextMessage("Valid"));
+        producer->send(expiredMessage.get(), 
cms::Message::DEFAULT_DELIVERY_MODE, cms::Message::DEFAULT_MSG_PRIORITY, 2000);
+        producer->send(validMessage.get());
+        session->close();
+    }
+
+    auto_ptr<cms::Session> 
session(connection->createSession(cms::Session::SESSION_TRANSACTED));
+    auto_ptr<cms::Queue> 
queue(session->createQueue("testWithZeroConsumerPrefetchWithInFlightExpiration"));
+    auto_ptr<cms::MessageConsumer> 
consumer(session->createConsumer(queue.get()));
+
+    {
+        auto_ptr<cms::Message> message(consumer->receive(5000));
+        CPPUNIT_ASSERT(message.get() != NULL);
+        TextMessage* received = dynamic_cast<TextMessage*>(message.get());
+        CPPUNIT_ASSERT_EQUAL(std::string("Expired"), received->getText());
+    }
+
+    session->rollback();
+    Thread::sleep(2500);
+
+    {
+        auto_ptr<cms::Message> message(consumer->receive(5000));
+        CPPUNIT_ASSERT(message.get() != NULL);
+        TextMessage* received = dynamic_cast<TextMessage*>(message.get());
+        CPPUNIT_ASSERT_EQUAL(std::string("Valid"), received->getText());
+    }
+
+    session->commit();
+    session->close();
+
+    amqConnection->destroyDestination(queue.get());
+}

http://git-wip-us.apache.org/repos/asf/activemq-cpp/blob/3e09e666/activemq-cpp/src/test-integration/activemq/test/openwire/OpenwireSimpleTest.h
----------------------------------------------------------------------
diff --git 
a/activemq-cpp/src/test-integration/activemq/test/openwire/OpenwireSimpleTest.h 
b/activemq-cpp/src/test-integration/activemq/test/openwire/OpenwireSimpleTest.h
index d4d623f..6e721bb 100644
--- 
a/activemq-cpp/src/test-integration/activemq/test/openwire/OpenwireSimpleTest.h
+++ 
b/activemq-cpp/src/test-integration/activemq/test/openwire/OpenwireSimpleTest.h
@@ -44,6 +44,7 @@ namespace openwire{
         CPPUNIT_TEST( testWithZeroConsumerPrefetch2 );
         CPPUNIT_TEST( testWithZeroConsumerPrefetchAndNoMessage2 );
         CPPUNIT_TEST( testWithZeroConsumerPrefetchAndZeroRedelivery );
+        CPPUNIT_TEST( testWithZeroConsumerPrefetchWithInFlightExpiration );
         CPPUNIT_TEST( testMapMessageSendToQueue );
         CPPUNIT_TEST( testMapMessageSendToTopic );
         CPPUNIT_TEST( testDestroyDestination );
@@ -68,6 +69,7 @@ namespace openwire{
         void testWithZeroConsumerPrefetch2();
         void testWithZeroConsumerPrefetchAndNoMessage2();
         void testWithZeroConsumerPrefetchAndZeroRedelivery();
+        void testWithZeroConsumerPrefetchWithInFlightExpiration();
         void testMapMessageSendToQueue();
         void testMapMessageSendToTopic();
         void tesstStreamMessage();

Reply via email to