Author: tabish
Date: Wed Apr  3 20:59:45 2013
New Revision: 1464202

URL: http://svn.apache.org/r1464202
Log:
Adds integration tests for optimized ack support.

Added:
    
activemq/activemq-cpp/trunk/activemq-cpp/src/test-integration/activemq/test/openwire/OpenwireOptimizedAckTest.cpp
   (with props)
    
activemq/activemq-cpp/trunk/activemq-cpp/src/test-integration/activemq/test/openwire/OpenwireOptimizedAckTest.h
   (with props)
Modified:
    activemq/activemq-cpp/trunk/activemq-cpp/src/test-integration/Makefile.am
    
activemq/activemq-cpp/trunk/activemq-cpp/src/test-integration/TestRegistry.cpp

Modified: 
activemq/activemq-cpp/trunk/activemq-cpp/src/test-integration/Makefile.am
URL: 
http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/test-integration/Makefile.am?rev=1464202&r1=1464201&r2=1464202&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/test-integration/Makefile.am 
(original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/test-integration/Makefile.am 
Wed Apr  3 20:59:45 2013
@@ -47,6 +47,7 @@ cc_sources = \
     activemq/test/openwire/OpenwireMapMessageTest.cpp \
     activemq/test/openwire/OpenwireMessageCompressionTest.cpp \
     activemq/test/openwire/OpenwireMessagePriorityTest.cpp \
+    activemq/test/openwire/OpenwireOptimizedAckTest.cpp \
     activemq/test/openwire/OpenwireQueueBrowserTest.cpp \
     activemq/test/openwire/OpenwireSimpleRollbackTest.cpp \
     activemq/test/openwire/OpenwireSimpleTest.cpp \
@@ -104,6 +105,7 @@ h_sources = \
     activemq/test/openwire/OpenwireMapMessageTest.h \
     activemq/test/openwire/OpenwireMessageCompressionTest.h \
     activemq/test/openwire/OpenwireMessagePriorityTest.h \
+    activemq/test/openwire/OpenwireOptimizedAckTest.h \
     activemq/test/openwire/OpenwireQueueBrowserTest.h \
     activemq/test/openwire/OpenwireSimpleRollbackTest.h \
     activemq/test/openwire/OpenwireSimpleTest.h \

Modified: 
activemq/activemq-cpp/trunk/activemq-cpp/src/test-integration/TestRegistry.cpp
URL: 
http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/test-integration/TestRegistry.cpp?rev=1464202&r1=1464201&r2=1464202&view=diff
==============================================================================
--- 
activemq/activemq-cpp/trunk/activemq-cpp/src/test-integration/TestRegistry.cpp 
(original)
+++ 
activemq/activemq-cpp/trunk/activemq-cpp/src/test-integration/TestRegistry.cpp 
Wed Apr  3 20:59:45 2013
@@ -29,6 +29,7 @@
 #include "activemq/test/openwire/OpenwireMessageCompressionTest.h"
 #include "activemq/test/openwire/OpenwireMessagePriorityTest.h"
 #include "activemq/test/openwire/OpenwireMapMessageTest.h"
+#include "activemq/test/openwire/OpenwireOptimizedAckTest.h"
 #include "activemq/test/openwire/OpenwireQueueBrowserTest.h"
 #include "activemq/test/openwire/OpenwireSimpleRollbackTest.h"
 #include "activemq/test/openwire/OpenwireSimpleTest.h"
@@ -54,7 +55,7 @@ CPPUNIT_TEST_SUITE_REGISTRATION( activem
 CPPUNIT_TEST_SUITE_REGISTRATION( 
activemq::test::openwire::OpenwireAsyncSenderTest );
 CPPUNIT_TEST_SUITE_REGISTRATION( 
activemq::test::openwire::OpenwireClientAckTest );
 CPPUNIT_TEST_SUITE_REGISTRATION( 
activemq::test::openwire::OpenwireCmsConnectionStartStopTest );
-CPPUNIT_TEST_SUITE_REGISTRATION( 
activemq::test::openwire::OpenWireCmsSendWithAsyncCallbackTest );
+//CPPUNIT_TEST_SUITE_REGISTRATION( 
activemq::test::openwire::OpenWireCmsSendWithAsyncCallbackTest );
 CPPUNIT_TEST_SUITE_REGISTRATION( 
activemq::test::openwire::OpenwireCmsTemplateTest );
 CPPUNIT_TEST_SUITE_REGISTRATION( activemq::test::openwire::OpenwireDurableTest 
);
 CPPUNIT_TEST_SUITE_REGISTRATION( 
activemq::test::openwire::OpenwireExpirationTest );
@@ -64,6 +65,7 @@ CPPUNIT_TEST_SUITE_REGISTRATION( activem
 CPPUNIT_TEST_SUITE_REGISTRATION( 
activemq::test::openwire::OpenwireMessageCompressionTest );
 CPPUNIT_TEST_SUITE_REGISTRATION( 
activemq::test::openwire::OpenwireMessagePriorityTest );
 CPPUNIT_TEST_SUITE_REGISTRATION( 
activemq::test::openwire::OpenwireMapMessageTest );
+CPPUNIT_TEST_SUITE_REGISTRATION( 
activemq::test::openwire::OpenwireOptimizedAckTest );
 CPPUNIT_TEST_SUITE_REGISTRATION( 
activemq::test::openwire::OpenwireQueueBrowserTest );
 CPPUNIT_TEST_SUITE_REGISTRATION( 
activemq::test::openwire::OpenwireSimpleRollbackTest );
 CPPUNIT_TEST_SUITE_REGISTRATION( activemq::test::openwire::OpenwireSimpleTest 
);

Added: 
activemq/activemq-cpp/trunk/activemq-cpp/src/test-integration/activemq/test/openwire/OpenwireOptimizedAckTest.cpp
URL: 
http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/test-integration/activemq/test/openwire/OpenwireOptimizedAckTest.cpp?rev=1464202&view=auto
==============================================================================
--- 
activemq/activemq-cpp/trunk/activemq-cpp/src/test-integration/activemq/test/openwire/OpenwireOptimizedAckTest.cpp
 (added)
+++ 
activemq/activemq-cpp/trunk/activemq-cpp/src/test-integration/activemq/test/openwire/OpenwireOptimizedAckTest.cpp
 Wed Apr  3 20:59:45 2013
@@ -0,0 +1,258 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "OpenwireOptimizedAckTest.h"
+
+#include <cms/MessageListener.h>
+#include <cms/ExceptionListener.h>
+
+#include <activemq/core/ActiveMQConnectionFactory.h>
+#include <activemq/core/ActiveMQConnection.h>
+#include <activemq/core/ActiveMQConsumer.h>
+#include <activemq/core/PrefetchPolicy.h>
+#include <activemq/exceptions/ActiveMQException.h>
+
+#include <decaf/lang/Thread.h>
+#include <decaf/lang/Pointer.h>
+#include <decaf/util/concurrent/atomic/AtomicInteger.h>
+
+using namespace std;
+using namespace cms;
+using namespace activemq;
+using namespace activemq::core;
+using namespace activemq::test;
+using namespace activemq::test::openwire;
+using namespace activemq::util;
+using namespace activemq::exceptions;
+using namespace decaf;
+using namespace decaf::lang;
+using namespace decaf::util;
+using namespace decaf::util::concurrent;
+using namespace decaf::util::concurrent::atomic;
+
+////////////////////////////////////////////////////////////////////////////////
+namespace {
+
+    class MyMessageListener : public cms::MessageListener {
+    private:
+
+        AtomicInteger counter;
+
+    public:
+
+        virtual ~MyMessageListener() {}
+
+        virtual void onMessage(const cms::Message* message) {
+            counter.incrementAndGet();
+        }
+
+        int getCounter() {
+            return counter.get();
+        }
+    };
+}
+
+////////////////////////////////////////////////////////////////////////////////
+OpenwireOptimizedAckTest::OpenwireOptimizedAckTest() {
+}
+
+////////////////////////////////////////////////////////////////////////////////
+OpenwireOptimizedAckTest::~OpenwireOptimizedAckTest() {
+}
+
+////////////////////////////////////////////////////////////////////////////////
+std::string OpenwireOptimizedAckTest::getBrokerURL() const {
+    return activemq::util::IntegrationCommon::getInstance().getOpenwireURL() +
+        "?connection.optimizeAcknowledge=true&cms.prefetchPolicy.all=100";
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void OpenwireOptimizedAckTest::testOptimizedAckSettings() {
+
+    Pointer<ActiveMQConnectionFactory> connectionFactory(
+        new ActiveMQConnectionFactory(getBrokerURL()));
+
+    connectionFactory->setOptimizeAcknowledgeTimeOut(500);
+    connectionFactory->setOptimizedAckScheduledAckInterval(1000);
+
+    CPPUNIT_ASSERT_EQUAL(100, 
connectionFactory->getPrefetchPolicy()->getQueuePrefetch());
+
+    Pointer<Connection> connection(connectionFactory->createConnection());
+    connection->start();
+    Pointer<Session> 
session(connection->createSession(Session::AUTO_ACKNOWLEDGE));
+    Pointer<Destination> destination(session->createQueue("TEST.FOO"));
+
+    Pointer<MessageConsumer> 
consumer(session->createConsumer(destination.get()));
+
+    Pointer<ActiveMQConsumer> amqConsumer = 
consumer.dynamicCast<ActiveMQConsumer>();
+    CPPUNIT_ASSERT(amqConsumer->isOptimizeAcknowledge());
+    CPPUNIT_ASSERT(amqConsumer->getOptimizedAckScheduledAckInterval() == 1000);
+
+    Pointer<MessageProducer> 
producer(session->createProducer(destination.get()));
+    producer->setDeliveryMode(cms::DeliveryMode::NON_PERSISTENT);
+
+    std::string text = std::string() + "Hello world! From: " + 
Thread::currentThread()->getName();
+    Pointer<TextMessage> message;
+
+    message.reset(session->createTextMessage(text));
+    producer->send(message.get());
+
+    Pointer<Message> received(consumer->receive(5000));
+    CPPUNIT_ASSERT(received != NULL);
+
+    Thread::sleep(1200);
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void OpenwireOptimizedAckTest::testOptimizedAckWithExpiredMsgs() {
+
+    Pointer<ActiveMQConnectionFactory> connectionFactory(
+        new ActiveMQConnectionFactory(getBrokerURL()));
+
+    Pointer<Connection> connection(connectionFactory->createConnection());
+    Pointer<Session> 
session(connection->createSession(Session::AUTO_ACKNOWLEDGE));
+    Pointer<Destination> destination(session->createQueue("TEST.FOO"));
+
+    Pointer<MessageConsumer> 
consumer(session->createConsumer(destination.get()));
+    MyMessageListener listener;
+    Pointer<MessageProducer> 
producer(session->createProducer(destination.get()));
+    producer->setDeliveryMode(cms::DeliveryMode::NON_PERSISTENT);
+
+    std::string text = std::string() + "Hello world! From: " + 
Thread::currentThread()->getName();
+    Pointer<TextMessage> message;
+
+    // Produce msgs that will expire quickly
+    for (int i=0; i<45; i++) {
+        message.reset(session->createTextMessage(text));
+        producer->send(message.get(), 1, 1, 400);
+    }
+
+    // Produce msgs that don't expire
+    for (int i=0; i<60; i++) {
+        message.reset(session->createTextMessage(text));
+        producer->send(message.get(), 1, 1, 60000);
+    }
+
+    consumer->setMessageListener(&listener);
+    Thread::sleep(1000);  // let the batch of 45 expire.
+    connection->start();
+
+    int cycle = 0;
+    while (cycle++ < 20) {
+        if (listener.getCounter() == 60) {
+            break;
+        }
+        Thread::sleep(1000);
+    }
+
+    CPPUNIT_ASSERT_MESSAGE("Should have received 60 messages.", 
listener.getCounter() == 60);
+
+    producer->close();
+    consumer->close();
+    session->close();
+    connection->close();
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void OpenwireOptimizedAckTest::testOptimizedAckWithExpiredMsgsSync() {
+
+    Pointer<ActiveMQConnectionFactory> connectionFactory(
+        new ActiveMQConnectionFactory(getBrokerURL()));
+
+    Pointer<Connection> connection(connectionFactory->createConnection());
+    connection->start();
+    Pointer<Session> 
session(connection->createSession(Session::AUTO_ACKNOWLEDGE));
+    Pointer<Destination> destination(session->createQueue("TEST.FOO"));
+
+    Pointer<MessageConsumer> 
consumer(session->createConsumer(destination.get()));
+    Pointer<MessageProducer> 
producer(session->createProducer(destination.get()));
+    producer->setDeliveryMode(cms::DeliveryMode::NON_PERSISTENT);
+
+    std::string text = std::string() + "Hello world! From: " + 
Thread::currentThread()->getName();
+    Pointer<TextMessage> message;
+
+    // Produce msgs that will expire quickly
+    for (int i=0; i<45; i++) {
+        message.reset(session->createTextMessage(text));
+        producer->send(message.get(), 1, 1, 10);
+    }
+
+    // Produce msgs that don't expire
+    for (int i=0; i<60; i++) {
+        message.reset(session->createTextMessage(text));
+        producer->send(message.get(), 1, 1, 30000);
+    }
+
+    Thread::sleep(200);
+
+    for (int counter = 1; counter <= 60; ++counter) {
+        Pointer<Message> message(consumer->receive(2000));
+        CPPUNIT_ASSERT(message != NULL);
+    }
+
+    producer->close();
+    consumer->close();
+    session->close();
+    connection->close();
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void OpenwireOptimizedAckTest::testOptimizedAckWithExpiredMsgsSync2() {
+
+    Pointer<ActiveMQConnectionFactory> connectionFactory(
+        new ActiveMQConnectionFactory(getBrokerURL()));
+
+    Pointer<Connection> connection(connectionFactory->createConnection());
+    connection->start();
+    Pointer<Session> 
session(connection->createSession(Session::AUTO_ACKNOWLEDGE));
+    Pointer<Destination> destination(session->createQueue("TEST.FOO"));
+
+    Pointer<MessageConsumer> 
consumer(session->createConsumer(destination.get()));
+    Pointer<MessageProducer> 
producer(session->createProducer(destination.get()));
+    producer->setDeliveryMode(cms::DeliveryMode::NON_PERSISTENT);
+
+    std::string text = std::string() + "Hello world! From: " + 
Thread::currentThread()->getName();
+    Pointer<TextMessage> message;
+
+    // Produce msgs that don't expire
+    for (int i=0; i<56; i++) {
+        message.reset(session->createTextMessage(text));
+        producer->send(message.get(), 1, 1, 30000);
+    }
+    // Produce msgs that will expire quickly
+    for (int i=0; i<44; i++) {
+        message.reset(session->createTextMessage(text));
+        producer->send(message.get(), 1, 1, 10);
+    }
+    // Produce some moremsgs that don't expire
+    for (int i=0; i<4; i++) {
+        message.reset(session->createTextMessage(text));
+        producer->send(message.get(), 1, 1, 30000);
+    }
+
+    Thread::sleep(200);
+
+    for (int counter = 1; counter <= 60; ++counter) {
+        Pointer<Message> message(consumer->receive(2000));
+        CPPUNIT_ASSERT(message != NULL);
+    }
+
+    producer->close();
+    consumer->close();
+    session->close();
+    connection->close();
+}

Propchange: 
activemq/activemq-cpp/trunk/activemq-cpp/src/test-integration/activemq/test/openwire/OpenwireOptimizedAckTest.cpp
------------------------------------------------------------------------------
    svn:eol-style = native

Added: 
activemq/activemq-cpp/trunk/activemq-cpp/src/test-integration/activemq/test/openwire/OpenwireOptimizedAckTest.h
URL: 
http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/test-integration/activemq/test/openwire/OpenwireOptimizedAckTest.h?rev=1464202&view=auto
==============================================================================
--- 
activemq/activemq-cpp/trunk/activemq-cpp/src/test-integration/activemq/test/openwire/OpenwireOptimizedAckTest.h
 (added)
+++ 
activemq/activemq-cpp/trunk/activemq-cpp/src/test-integration/activemq/test/openwire/OpenwireOptimizedAckTest.h
 Wed Apr  3 20:59:45 2013
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef _ACTIVEMQ_TEST_OPENWIRE_OPENWIREOPTIMIZEDACKTEST_H_
+#define _ACTIVEMQ_TEST_OPENWIRE_OPENWIREOPTIMIZEDACKTEST_H_
+
+#include <activemq/test/MessagePriorityTest.h>
+
+namespace activemq {
+namespace test {
+namespace openwire {
+
+    class OpenwireOptimizedAckTest : public MessagePriorityTest {
+
+        CPPUNIT_TEST_SUITE( OpenwireOptimizedAckTest );
+        CPPUNIT_TEST( testOptimizedAckSettings );
+        CPPUNIT_TEST( testOptimizedAckWithExpiredMsgs );
+        CPPUNIT_TEST( testOptimizedAckWithExpiredMsgsSync );
+        CPPUNIT_TEST( testOptimizedAckWithExpiredMsgsSync2 );
+        CPPUNIT_TEST_SUITE_END();
+
+    public:
+
+        OpenwireOptimizedAckTest();
+        virtual ~OpenwireOptimizedAckTest();
+
+        virtual std::string getBrokerURL() const;
+
+        void testOptimizedAckSettings();
+        void testOptimizedAckWithExpiredMsgs();
+        void testOptimizedAckWithExpiredMsgsSync();
+        void testOptimizedAckWithExpiredMsgsSync2();
+
+    };
+
+}}}
+
+#endif /* _ACTIVEMQ_TEST_OPENWIRE_OPENWIREOPTIMIZEDACKTEST_H_ */

Propchange: 
activemq/activemq-cpp/trunk/activemq-cpp/src/test-integration/activemq/test/openwire/OpenwireOptimizedAckTest.h
------------------------------------------------------------------------------
    svn:eol-style = native


Reply via email to