Author: tabish
Date: Tue Apr  9 22:17:05 2013
New Revision: 1466265

URL: http://svn.apache.org/r1466265
Log:
https://issues.apache.org/jira/browse/AMQCPP-472

Modified:
    
activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/kernels/ActiveMQConsumerKernel.cpp
    
activemq/activemq-cpp/trunk/activemq-cpp/src/test-integration/activemq/test/openwire/OpenwireNonBlockingRedeliveryTest.cpp
    
activemq/activemq-cpp/trunk/activemq-cpp/src/test-integration/activemq/test/openwire/OpenwireNonBlockingRedeliveryTest.h

Modified: 
activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/kernels/ActiveMQConsumerKernel.cpp
URL: 
http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/kernels/ActiveMQConsumerKernel.cpp?rev=1466265&r1=1466264&r2=1466265&view=diff
==============================================================================
--- 
activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/kernels/ActiveMQConsumerKernel.cpp
 (original)
+++ 
activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/kernels/ActiveMQConsumerKernel.cpp
 Tue Apr  9 22:17:05 2013
@@ -26,6 +26,7 @@
 #include <decaf/lang/Integer.h>
 #include <decaf/lang/Long.h>
 #include <decaf/util/HashMap.h>
+#include <decaf/util/Collections.h>
 #include <decaf/util/concurrent/ExecutorService.h>
 #include <decaf/util/concurrent/Executors.h>
 #include <activemq/util/Config.h>
@@ -436,14 +437,14 @@ namespace {
 
     public:
 
-        ClientAckHandler( ActiveMQSessionKernel* session ) : session(session) {
+        ClientAckHandler(ActiveMQSessionKernel* session) : session(session) {
             if (session == NULL) {
                 throw NullPointerException(
                     __FILE__, __LINE__, "Ack Handler Created with NULL 
Session.");
             }
         }
 
-        void acknowledgeMessage(const commands::Message* message AMQCPP_UNUSED 
) {
+        void acknowledgeMessage(const commands::Message* message 
AMQCPP_UNUSED) {
             try {
                 this->session->acknowledge();
             }
@@ -595,7 +596,7 @@ namespace {
         ActiveMQSessionKernel* session;
         Pointer<ActiveMQConsumerKernel> consumer;
         ActiveMQConsumerKernelConfig* impl;
-        LinkedList<Pointer<MessageDispatch> > redeliveries;
+        ArrayList<Pointer<MessageDispatch> > redeliveries;
 
     private:
 
@@ -608,6 +609,7 @@ namespace {
             Runnable(), session(session), consumer(consumer), impl(impl), 
redeliveries() {
 
             this->redeliveries.copy(impl->dispatchedMessages);
+            Collections::reverse(this->redeliveries);
         }
         virtual ~NonBlockingRedeliveryTask() {}
 
@@ -1183,7 +1185,7 @@ void ActiveMQConsumerKernel::afterMessag
                                 
makeAckForAllDeliveredMessages(ActiveMQConstants::ACK_TYPE_CONSUMED);
                             if (ack != NULL) {
                                 this->internal->dispatchedMessages.clear();
-                                session->oneway(ack);
+                                session->sendAck(ack);
                             }
                         }
                     }
@@ -1275,7 +1277,7 @@ void ActiveMQConsumerKernel::ackLater(Po
         // old pending ack being superseded by ack of another type, if is is 
not a delivered
         // ack and hence important, send it now so it is not lost.
         if (oldPendingAck->getAckType() != 
ActiveMQConstants::ACK_TYPE_DELIVERED) {
-            session->oneway(oldPendingAck);
+            session->sendAck(oldPendingAck);
         }
     }
 
@@ -1436,14 +1438,17 @@ void ActiveMQConsumerKernel::rollback() 
                     this->internal->additionalWindowSize - (int) 
this->internal->dispatchedMessages.size());
                 this->internal->redeliveryDelay = 0;
 
+                this->internal->deliveredCounter -= (int) 
internal->dispatchedMessages.size();
+                this->internal->dispatchedMessages.clear();
+
             } else {
 
                 // only redelivery_ack after first delivery
                 if (currentRedeliveryCount > 0) {
-                    Pointer<MessageAck> ack(new MessageAck(lastMsg, 
ActiveMQConstants::ACK_TYPE_POISON,
+                    Pointer<MessageAck> ack(new MessageAck(lastMsg, 
ActiveMQConstants::ACK_TYPE_REDELIVERED,
                                             
this->internal->dispatchedMessages.size()));
                     ack->setFirstMessageId(firstMsgId);
-                    session->oneway(ack);
+                    session->sendAck(ack);
                 }
 
                 if (this->internal->nonBlockingRedelivery) {
@@ -1451,9 +1456,15 @@ void ActiveMQConsumerKernel::rollback() 
                     if (!this->internal->unconsumedMessages->isClosed()) {
                         Pointer<ActiveMQConsumerKernel> self =
                             
this->session->lookupConsumerKernel(this->consumerInfo->getConsumerId());
+
+                        NonBlockingRedeliveryTask* redeliveryTask =
+                            new NonBlockingRedeliveryTask(session, self, 
this->internal);
+
+                        this->internal->deliveredCounter -= (int) 
internal->dispatchedMessages.size();
+                        this->internal->dispatchedMessages.clear();
+
                         this->session->getScheduler()->executeAfterDelay(
-                            new NonBlockingRedeliveryTask(session, self, 
this->internal),
-                            this->internal->redeliveryDelay);
+                            redeliveryTask, this->internal->redeliveryDelay);
                     }
                 } else {
                     // stop the delivery of messages.
@@ -1465,6 +1476,9 @@ void ActiveMQConsumerKernel::rollback() 
                         
this->internal->unconsumedMessages->enqueueFirst(iter->next());
                     }
 
+                    this->internal->deliveredCounter -= (int) 
internal->dispatchedMessages.size();
+                    this->internal->dispatchedMessages.clear();
+
                     if (internal->redeliveryDelay > 0 && 
!this->internal->unconsumedMessages->isClosed()) {
                         Pointer<ActiveMQConsumerKernel> self =
                             
this->session->lookupConsumerKernel(this->consumerInfo->getConsumerId());
@@ -1475,8 +1489,6 @@ void ActiveMQConsumerKernel::rollback() 
                     }
                 }
             }
-            this->internal->deliveredCounter -= (int) 
internal->dispatchedMessages.size();
-            this->internal->dispatchedMessages.clear();
         }
     }
 

Modified: 
activemq/activemq-cpp/trunk/activemq-cpp/src/test-integration/activemq/test/openwire/OpenwireNonBlockingRedeliveryTest.cpp
URL: 
http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/test-integration/activemq/test/openwire/OpenwireNonBlockingRedeliveryTest.cpp?rev=1466265&r1=1466264&r2=1466265&view=diff
==============================================================================
--- 
activemq/activemq-cpp/trunk/activemq-cpp/src/test-integration/activemq/test/openwire/OpenwireNonBlockingRedeliveryTest.cpp
 (original)
+++ 
activemq/activemq-cpp/trunk/activemq-cpp/src/test-integration/activemq/test/openwire/OpenwireNonBlockingRedeliveryTest.cpp
 Tue Apr  9 22:17:05 2013
@@ -32,6 +32,7 @@
 #include <decaf/lang/Thread.h>
 #include <decaf/lang/Pointer.h>
 #include <decaf/util/LinkedList.h>
+#include <decaf/util/LinkedHashSet.h>
 #include <decaf/util/concurrent/atomic/AtomicInteger.h>
 
 using namespace std;
@@ -52,6 +53,41 @@ using namespace decaf::util::concurrent:
 
////////////////////////////////////////////////////////////////////////////////
 namespace {
 
+    void sendMessages(const std::string& uri, const std::string 
destinationName, int count) {
+        Pointer<ActiveMQConnectionFactory> connectionFactory(new 
ActiveMQConnectionFactory(uri));
+        Pointer<Connection> connection(connectionFactory->createConnection());
+        Pointer<Session> 
session(connection->createSession(Session::AUTO_ACKNOWLEDGE));
+        Pointer<Destination> 
destination(session->createQueue(destinationName));
+        Pointer<MessageProducer> 
producer(session->createProducer(destination.get()));
+        for(int i = 0; i < count; ++i) {
+            Pointer<TextMessage> message(session->createTextMessage());
+            producer->send(message.get());
+        }
+        connection->close();
+    }
+
+    void destroyDestination(const std::string& uri, const std::string 
destinationName) {
+        Pointer<ActiveMQConnectionFactory> connectionFactory(new 
ActiveMQConnectionFactory(uri));
+        Pointer<Connection> connection(connectionFactory->createConnection());
+        Pointer<Session> 
session(connection->createSession(Session::AUTO_ACKNOWLEDGE));
+        Pointer<Destination> 
destination(session->createQueue(destinationName));
+        Pointer<ActiveMQConnection> amqCon = 
connection.dynamicCast<ActiveMQConnection>();
+        amqCon->destroyDestination(destination.get());
+        connection->close();
+    }
+
+    bool assertTrue(LinkedHashSet< Pointer<MessageId> >& set, int expected) {
+        for (int i = 0; i <= 60; ++i) {
+            if (set.size() == expected) {
+                return true;
+            }
+
+            Thread::sleep(1000);
+        }
+
+        return false;
+    }
+
     class TestProducer : public Thread {
     private:
 
@@ -235,8 +271,10 @@ void OpenwireNonBlockingRedeliveryTest::
 
     const std::string DEST_NAME = "QUEUE.FOO";
 
-    TestProducer producer(getBrokerURL(), DEST_NAME, 500);
-    TestConsumer consumer(getBrokerURL(), DEST_NAME, &messages, 500);
+    destroyDestination(getBrokerURL(), DEST_NAME);
+
+    TestProducer producer(getBrokerURL(), DEST_NAME, 100);
+    TestConsumer consumer(getBrokerURL(), DEST_NAME, &messages, 100);
 
     producer.start();
     consumer.start();
@@ -259,4 +297,369 @@ void OpenwireNonBlockingRedeliveryTest::
     }
 
     CPPUNIT_ASSERT(!ordered);
+    destroyDestination(getBrokerURL(), DEST_NAME);
+}
+
+////////////////////////////////////////////////////////////////////////////////
+namespace {
+
+    class ReceivedListener : public cms::MessageListener {
+    private:
+
+        LinkedHashSet< Pointer<MessageId> >* received;
+
+    public:
+
+        ReceivedListener(LinkedHashSet< Pointer<MessageId> >* received) :
+            cms::MessageListener(), received(received) {
+        }
+
+        virtual ~ReceivedListener() {
+        }
+
+        virtual void onMessage(const cms::Message* message) {
+            const commands::Message* amqMessage =
+                dynamic_cast<const commands::Message*>(message);
+
+            received->add(amqMessage->getMessageId());
+        }
+
+    };
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void 
OpenwireNonBlockingRedeliveryTest::testMessageDeleiveredWhenNonBlockingEnabled()
 {
+
+    LinkedHashSet< Pointer<MessageId> > received;
+    LinkedHashSet< Pointer<MessageId> > beforeRollback;
+    LinkedHashSet< Pointer<MessageId> > afterRollback;
+
+    const int MSG_COUNT = 100;
+    const std::string destinationName = 
"testMessageDeleiveredWhenNonBlockingEnabled";
+
+    destroyDestination(getBrokerURL(), destinationName);
+
+    Pointer<ActiveMQConnectionFactory> connectionFactory(new 
ActiveMQConnectionFactory(getBrokerURL()));
+    Pointer<Connection> connection(connectionFactory->createConnection());
+    Pointer<Session> 
session(connection->createSession(Session::SESSION_TRANSACTED));
+    Pointer<Destination> destination(session->createQueue(destinationName));
+    Pointer<MessageConsumer> 
consumer(session->createConsumer(destination.get()));
+
+    ReceivedListener receivedListener(&received);
+    consumer->setMessageListener(&receivedListener);
+    sendMessages(getBrokerURL(), destinationName, MSG_COUNT);
+
+    connection->start();
+
+    CPPUNIT_ASSERT_MESSAGE("Pre-Rollack received size incorrect", 
assertTrue(received, MSG_COUNT));
+
+    beforeRollback.addAll(received);
+    received.clear();
+    session->rollback();
+
+    CPPUNIT_ASSERT_MESSAGE("Post-Rollack received size incorrect", 
assertTrue(received, MSG_COUNT));
+
+    afterRollback.addAll(received);
+    received.clear();
+
+    CPPUNIT_ASSERT_EQUAL(beforeRollback.size(), afterRollback.size());
+    CPPUNIT_ASSERT(beforeRollback.equals(afterRollback));
+    session->commit();
+    connection->close();
+    destroyDestination(getBrokerURL(), destinationName);
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void OpenwireNonBlockingRedeliveryTest::testMessageRedeliveriesAreInOrder() {
+
+    LinkedHashSet< Pointer<MessageId> > received;
+    LinkedHashSet< Pointer<MessageId> > beforeRollback;
+    LinkedHashSet< Pointer<MessageId> > afterRollback;
+
+    const int MSG_COUNT = 100;
+    const std::string destinationName = 
"testMessageDeleiveredWhenNonBlockingEnabled";
+
+    destroyDestination(getBrokerURL(), destinationName);
+
+    Pointer<ActiveMQConnectionFactory> connectionFactory(new 
ActiveMQConnectionFactory(getBrokerURL()));
+    Pointer<Connection> connection(connectionFactory->createConnection());
+    Pointer<Session> 
session(connection->createSession(Session::SESSION_TRANSACTED));
+    Pointer<Destination> destination(session->createQueue(destinationName));
+    Pointer<MessageConsumer> 
consumer(session->createConsumer(destination.get()));
+
+    ReceivedListener receivedListener(&received);
+    consumer->setMessageListener(&receivedListener);
+    sendMessages(getBrokerURL(), destinationName, MSG_COUNT);
+
+    connection->start();
+
+    CPPUNIT_ASSERT_MESSAGE("Pre-Rollack received size incorrect", 
assertTrue(received, MSG_COUNT));
+
+    beforeRollback.addAll(received);
+    received.clear();
+    session->rollback();
+
+    CPPUNIT_ASSERT_MESSAGE("Post-Rollack received size incorrect", 
assertTrue(received, MSG_COUNT));
+
+    afterRollback.addAll(received);
+    received.clear();
+
+    CPPUNIT_ASSERT_EQUAL(beforeRollback.size(), afterRollback.size());
+    CPPUNIT_ASSERT(beforeRollback.equals(afterRollback));
+
+    Pointer< Iterator<Pointer<MessageId> > > after(afterRollback.iterator());
+    Pointer< Iterator<Pointer<MessageId> > > before(beforeRollback.iterator());
+
+    while (before->hasNext() && after->hasNext()) {
+        Pointer<MessageId> original = before->next();
+        Pointer<MessageId> rolledBack = after->next();
+
+        long long originalSeq = original->getProducerSequenceId();
+        long long rolledbackSeq = rolledBack->getProducerSequenceId();
+
+        CPPUNIT_ASSERT_EQUAL(originalSeq, rolledbackSeq);
+    }
+
+    session->commit();
+    connection->close();
+    destroyDestination(getBrokerURL(), destinationName);
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void OpenwireNonBlockingRedeliveryTest::testMessageDeleiveryDoesntStop() {
+
+    LinkedHashSet< Pointer<MessageId> > received;
+    LinkedHashSet< Pointer<MessageId> > beforeRollback;
+    LinkedHashSet< Pointer<MessageId> > afterRollback;
+
+    const int MSG_COUNT = 100;
+    const std::string destinationName = "testMessageDeleiveryDoesntStop";
+
+    destroyDestination(getBrokerURL(), destinationName);
+
+    Pointer<ActiveMQConnectionFactory> connectionFactory(new 
ActiveMQConnectionFactory(getBrokerURL()));
+    Pointer<Connection> connection(connectionFactory->createConnection());
+    Pointer<Session> 
session(connection->createSession(Session::SESSION_TRANSACTED));
+    Pointer<Destination> destination(session->createQueue(destinationName));
+    Pointer<MessageConsumer> 
consumer(session->createConsumer(destination.get()));
+
+    ReceivedListener receivedListener(&received);
+    consumer->setMessageListener(&receivedListener);
+    sendMessages(getBrokerURL(), destinationName, MSG_COUNT);
+
+    connection->start();
+
+    CPPUNIT_ASSERT_MESSAGE("Pre-Rollack received size incorrect", 
assertTrue(received, MSG_COUNT));
+
+    beforeRollback.addAll(received);
+    received.clear();
+    session->rollback();
+
+    sendMessages(getBrokerURL(), destinationName, MSG_COUNT);
+
+    CPPUNIT_ASSERT_MESSAGE("Post-Rollack received size incorrect", 
assertTrue(received, MSG_COUNT * 2));
+
+    afterRollback.addAll(received);
+    received.clear();
+
+    CPPUNIT_ASSERT_EQUAL(beforeRollback.size() * 2, afterRollback.size());
+    session->commit();
+    connection->close();
+
+    destroyDestination(getBrokerURL(), destinationName);
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void 
OpenwireNonBlockingRedeliveryTest::testNonBlockingMessageDeleiveryIsDelayed() {
+
+    LinkedHashSet< Pointer<MessageId> > received;
+
+    const int MSG_COUNT = 100;
+    const std::string destinationName = 
"testNonBlockingMessageDeleiveryIsDelayed";
+
+    destroyDestination(getBrokerURL(), destinationName);
+
+    Pointer<ActiveMQConnectionFactory> connectionFactory(new 
ActiveMQConnectionFactory(getBrokerURL()));
+    
connectionFactory->getRedeliveryPolicy()->setInitialRedeliveryDelay(TimeUnit::SECONDS.toMillis(10));
+
+    Pointer<Connection> connection(connectionFactory->createConnection());
+    Pointer<Session> 
session(connection->createSession(Session::SESSION_TRANSACTED));
+    Pointer<Destination> destination(session->createQueue(destinationName));
+    Pointer<MessageConsumer> 
consumer(session->createConsumer(destination.get()));
+
+    ReceivedListener receivedListener(&received);
+    consumer->setMessageListener(&receivedListener);
+    sendMessages(getBrokerURL(), destinationName, MSG_COUNT);
+
+    connection->start();
+
+    CPPUNIT_ASSERT_MESSAGE("Pre-Rollack received size incorrect", 
assertTrue(received, MSG_COUNT));
+
+    received.clear();
+    session->rollback();
+
+    TimeUnit::SECONDS.sleep(6);
+    CPPUNIT_ASSERT_MESSAGE("Rollback redelivery was not delayed.", 
received.isEmpty());
+
+    CPPUNIT_ASSERT_MESSAGE("Post-Rollack received size incorrect", 
assertTrue(received, MSG_COUNT));
+
+    session->commit();
+    connection->close();
+
+    destroyDestination(getBrokerURL(), destinationName);
+}
+
+////////////////////////////////////////////////////////////////////////////////
+namespace {
+
+    class SomeRollbacksListener : public cms::MessageListener {
+    private:
+
+        int count;
+        Pointer<Session> session;
+        LinkedHashSet< Pointer<MessageId> >* received;
+
+    public:
+
+        SomeRollbacksListener(Pointer<Session> session, LinkedHashSet< 
Pointer<MessageId> >* received) :
+            cms::MessageListener(), count(0), session(session), 
received(received) {
+        }
+
+        virtual ~SomeRollbacksListener() {}
+
+        virtual void onMessage(const cms::Message* message) {
+            const commands::Message* amqMessage =
+                dynamic_cast<const commands::Message*>(message);
+
+            if (++count > 10) {
+                try {
+                    session->rollback();
+                    count = 0;
+                } catch (CMSException& e) {
+                }
+            } else {
+                received->add(amqMessage->getMessageId());
+                try {
+                    session->commit();
+                } catch (CMSException& e) {
+                }
+            }
+        }
+
+    };
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void 
OpenwireNonBlockingRedeliveryTest::testNonBlockingMessageDeleiveryWithRollbacks()
 {
+
+    LinkedHashSet< Pointer<MessageId> > received;
+
+    const int MSG_COUNT = 100;
+    const std::string destinationName = 
"testNonBlockingMessageDeleiveryWithRollbacks";
+
+    destroyDestination(getBrokerURL(), destinationName);
+
+    Pointer<ActiveMQConnectionFactory> connectionFactory(new 
ActiveMQConnectionFactory(getBrokerURL()));
+    
connectionFactory->getRedeliveryPolicy()->setInitialRedeliveryDelay(TimeUnit::SECONDS.toMillis(10));
+
+    Pointer<Connection> connection(connectionFactory->createConnection());
+    Pointer<Session> 
session(connection->createSession(Session::SESSION_TRANSACTED));
+    Pointer<Destination> destination(session->createQueue(destinationName));
+    Pointer<MessageConsumer> 
consumer(session->createConsumer(destination.get()));
+
+    ReceivedListener receivedListener(&received);
+    consumer->setMessageListener(&receivedListener);
+    sendMessages(getBrokerURL(), destinationName, MSG_COUNT);
+
+    connection->start();
+
+    CPPUNIT_ASSERT_MESSAGE("Pre-Rollack received size incorrect", 
assertTrue(received, MSG_COUNT));
+
+    received.clear();
+
+    SomeRollbacksListener newListener(session, &received);
+    consumer->setMessageListener(&newListener);
+
+    session->rollback();
+
+    CPPUNIT_ASSERT_MESSAGE("Post-Rollack received size incorrect", 
assertTrue(received, MSG_COUNT));
+
+    session->commit();
+    connection->close();
+
+    destroyDestination(getBrokerURL(), destinationName);
+}
+
+////////////////////////////////////////////////////////////////////////////////
+namespace {
+
+    class RollbacksListener : public cms::MessageListener {
+    private:
+
+        Pointer<Session> session;
+
+    public:
+
+        RollbacksListener(Pointer<Session> session) :
+            cms::MessageListener(), session(session) {
+        }
+
+        virtual ~RollbacksListener() {
+        }
+
+        virtual void onMessage(const cms::Message* message) {
+            try {
+                session->rollback();
+            } catch (CMSException& e) {
+            }
+        }
+
+    };
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void 
OpenwireNonBlockingRedeliveryTest::testNonBlockingMessageDeleiveryWithAllRolledBack()
 {
+
+    LinkedHashSet< Pointer<MessageId> > received;
+    LinkedHashSet< Pointer<MessageId> > dlqed;
+
+    const int MSG_COUNT = 100;
+    const std::string destinationName = 
"testNonBlockingMessageDeleiveryWithAllRolledBack";
+
+    destroyDestination(getBrokerURL(), destinationName);
+    destroyDestination(getBrokerURL(), "ActiveMQ.DLQ");
+
+    Pointer<ActiveMQConnectionFactory> connectionFactory(new 
ActiveMQConnectionFactory(getBrokerURL()));
+    connectionFactory->getRedeliveryPolicy()->setMaximumRedeliveries(5);
+    
connectionFactory->getRedeliveryPolicy()->setInitialRedeliveryDelay(TimeUnit::SECONDS.toMillis(5));
+
+    Pointer<Connection> connection(connectionFactory->createConnection());
+    Pointer<Session> 
session(connection->createSession(Session::SESSION_TRANSACTED));
+    Pointer<Destination> destination(session->createQueue(destinationName));
+    Pointer<Destination> dlq(session->createQueue("ActiveMQ.DLQ"));
+    Pointer<MessageConsumer> 
consumer(session->createConsumer(destination.get()));
+    Pointer<MessageConsumer> dlqConsumer(session->createConsumer(dlq.get()));
+
+    ReceivedListener dlqReceivedListener(&dlqed);
+    dlqConsumer->setMessageListener(&dlqReceivedListener);
+
+    ReceivedListener receivedListener(&received);
+    consumer->setMessageListener(&receivedListener);
+
+    sendMessages(getBrokerURL(), destinationName, MSG_COUNT);
+    connection->start();
+
+    CPPUNIT_ASSERT_MESSAGE("Pre-Rollack received size incorrect", 
assertTrue(received, MSG_COUNT));
+
+    session->rollback();
+
+    RollbacksListener rollbackListener(session);
+    consumer->setMessageListener(&rollbackListener);
+
+    CPPUNIT_ASSERT_MESSAGE("Post-Rollack DQL size incorrect", 
assertTrue(dlqed, MSG_COUNT));
+
+    session->commit();
+    connection->close();
+
+    destroyDestination(getBrokerURL(), destinationName);
 }

Modified: 
activemq/activemq-cpp/trunk/activemq-cpp/src/test-integration/activemq/test/openwire/OpenwireNonBlockingRedeliveryTest.h
URL: 
http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/test-integration/activemq/test/openwire/OpenwireNonBlockingRedeliveryTest.h?rev=1466265&r1=1466264&r2=1466265&view=diff
==============================================================================
--- 
activemq/activemq-cpp/trunk/activemq-cpp/src/test-integration/activemq/test/openwire/OpenwireNonBlockingRedeliveryTest.h
 (original)
+++ 
activemq/activemq-cpp/trunk/activemq-cpp/src/test-integration/activemq/test/openwire/OpenwireNonBlockingRedeliveryTest.h
 Tue Apr  9 22:17:05 2013
@@ -27,7 +27,13 @@ namespace openwire {
     class OpenwireNonBlockingRedeliveryTest : public MessagePriorityTest {
 
         CPPUNIT_TEST_SUITE( OpenwireNonBlockingRedeliveryTest );
-        CPPUNIT_TEST( testConsumerMessagesAreNotOrdered );
+//        CPPUNIT_TEST( testConsumerMessagesAreNotOrdered );
+//        CPPUNIT_TEST( testMessageDeleiveredWhenNonBlockingEnabled );
+//        CPPUNIT_TEST( testMessageDeleiveredWhenNonBlockingEnabled );
+//        CPPUNIT_TEST( testMessageDeleiveryDoesntStop );
+//        CPPUNIT_TEST( testNonBlockingMessageDeleiveryIsDelayed );
+//        CPPUNIT_TEST( testNonBlockingMessageDeleiveryWithRollbacks );
+        CPPUNIT_TEST( testNonBlockingMessageDeleiveryWithAllRolledBack );
         CPPUNIT_TEST_SUITE_END();
 
     public:
@@ -38,6 +44,12 @@ namespace openwire {
         virtual std::string getBrokerURL() const;
 
         void testConsumerMessagesAreNotOrdered();
+        void testMessageDeleiveredWhenNonBlockingEnabled();
+        void testMessageRedeliveriesAreInOrder();
+        void testMessageDeleiveryDoesntStop();
+        void testNonBlockingMessageDeleiveryIsDelayed();
+        void testNonBlockingMessageDeleiveryWithRollbacks();
+        void testNonBlockingMessageDeleiveryWithAllRolledBack();
 
     };
 


Reply via email to