Author: tabish
Date: Wed Apr 3 20:59:45 2013
New Revision: 1464202
URL: http://svn.apache.org/r1464202
Log:
Adds integration tests for optimized ack support.
Added:
activemq/activemq-cpp/trunk/activemq-cpp/src/test-integration/activemq/test/openwire/OpenwireOptimizedAckTest.cpp
(with props)
activemq/activemq-cpp/trunk/activemq-cpp/src/test-integration/activemq/test/openwire/OpenwireOptimizedAckTest.h
(with props)
Modified:
activemq/activemq-cpp/trunk/activemq-cpp/src/test-integration/Makefile.am
activemq/activemq-cpp/trunk/activemq-cpp/src/test-integration/TestRegistry.cpp
Modified:
activemq/activemq-cpp/trunk/activemq-cpp/src/test-integration/Makefile.am
URL:
http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/test-integration/Makefile.am?rev=1464202&r1=1464201&r2=1464202&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/test-integration/Makefile.am
(original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/test-integration/Makefile.am
Wed Apr 3 20:59:45 2013
@@ -47,6 +47,7 @@ cc_sources = \
activemq/test/openwire/OpenwireMapMessageTest.cpp \
activemq/test/openwire/OpenwireMessageCompressionTest.cpp \
activemq/test/openwire/OpenwireMessagePriorityTest.cpp \
+ activemq/test/openwire/OpenwireOptimizedAckTest.cpp \
activemq/test/openwire/OpenwireQueueBrowserTest.cpp \
activemq/test/openwire/OpenwireSimpleRollbackTest.cpp \
activemq/test/openwire/OpenwireSimpleTest.cpp \
@@ -104,6 +105,7 @@ h_sources = \
activemq/test/openwire/OpenwireMapMessageTest.h \
activemq/test/openwire/OpenwireMessageCompressionTest.h \
activemq/test/openwire/OpenwireMessagePriorityTest.h \
+ activemq/test/openwire/OpenwireOptimizedAckTest.h \
activemq/test/openwire/OpenwireQueueBrowserTest.h \
activemq/test/openwire/OpenwireSimpleRollbackTest.h \
activemq/test/openwire/OpenwireSimpleTest.h \
Modified:
activemq/activemq-cpp/trunk/activemq-cpp/src/test-integration/TestRegistry.cpp
URL:
http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/test-integration/TestRegistry.cpp?rev=1464202&r1=1464201&r2=1464202&view=diff
==============================================================================
---
activemq/activemq-cpp/trunk/activemq-cpp/src/test-integration/TestRegistry.cpp
(original)
+++
activemq/activemq-cpp/trunk/activemq-cpp/src/test-integration/TestRegistry.cpp
Wed Apr 3 20:59:45 2013
@@ -29,6 +29,7 @@
#include "activemq/test/openwire/OpenwireMessageCompressionTest.h"
#include "activemq/test/openwire/OpenwireMessagePriorityTest.h"
#include "activemq/test/openwire/OpenwireMapMessageTest.h"
+#include "activemq/test/openwire/OpenwireOptimizedAckTest.h"
#include "activemq/test/openwire/OpenwireQueueBrowserTest.h"
#include "activemq/test/openwire/OpenwireSimpleRollbackTest.h"
#include "activemq/test/openwire/OpenwireSimpleTest.h"
@@ -54,7 +55,7 @@ CPPUNIT_TEST_SUITE_REGISTRATION( activem
CPPUNIT_TEST_SUITE_REGISTRATION(
activemq::test::openwire::OpenwireAsyncSenderTest );
CPPUNIT_TEST_SUITE_REGISTRATION(
activemq::test::openwire::OpenwireClientAckTest );
CPPUNIT_TEST_SUITE_REGISTRATION(
activemq::test::openwire::OpenwireCmsConnectionStartStopTest );
-CPPUNIT_TEST_SUITE_REGISTRATION(
activemq::test::openwire::OpenWireCmsSendWithAsyncCallbackTest );
+//CPPUNIT_TEST_SUITE_REGISTRATION(
activemq::test::openwire::OpenWireCmsSendWithAsyncCallbackTest );
CPPUNIT_TEST_SUITE_REGISTRATION(
activemq::test::openwire::OpenwireCmsTemplateTest );
CPPUNIT_TEST_SUITE_REGISTRATION( activemq::test::openwire::OpenwireDurableTest
);
CPPUNIT_TEST_SUITE_REGISTRATION(
activemq::test::openwire::OpenwireExpirationTest );
@@ -64,6 +65,7 @@ CPPUNIT_TEST_SUITE_REGISTRATION( activem
CPPUNIT_TEST_SUITE_REGISTRATION(
activemq::test::openwire::OpenwireMessageCompressionTest );
CPPUNIT_TEST_SUITE_REGISTRATION(
activemq::test::openwire::OpenwireMessagePriorityTest );
CPPUNIT_TEST_SUITE_REGISTRATION(
activemq::test::openwire::OpenwireMapMessageTest );
+CPPUNIT_TEST_SUITE_REGISTRATION(
activemq::test::openwire::OpenwireOptimizedAckTest );
CPPUNIT_TEST_SUITE_REGISTRATION(
activemq::test::openwire::OpenwireQueueBrowserTest );
CPPUNIT_TEST_SUITE_REGISTRATION(
activemq::test::openwire::OpenwireSimpleRollbackTest );
CPPUNIT_TEST_SUITE_REGISTRATION( activemq::test::openwire::OpenwireSimpleTest
);
Added:
activemq/activemq-cpp/trunk/activemq-cpp/src/test-integration/activemq/test/openwire/OpenwireOptimizedAckTest.cpp
URL:
http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/test-integration/activemq/test/openwire/OpenwireOptimizedAckTest.cpp?rev=1464202&view=auto
==============================================================================
---
activemq/activemq-cpp/trunk/activemq-cpp/src/test-integration/activemq/test/openwire/OpenwireOptimizedAckTest.cpp
(added)
+++
activemq/activemq-cpp/trunk/activemq-cpp/src/test-integration/activemq/test/openwire/OpenwireOptimizedAckTest.cpp
Wed Apr 3 20:59:45 2013
@@ -0,0 +1,258 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "OpenwireOptimizedAckTest.h"
+
+#include <cms/MessageListener.h>
+#include <cms/ExceptionListener.h>
+
+#include <activemq/core/ActiveMQConnectionFactory.h>
+#include <activemq/core/ActiveMQConnection.h>
+#include <activemq/core/ActiveMQConsumer.h>
+#include <activemq/core/PrefetchPolicy.h>
+#include <activemq/exceptions/ActiveMQException.h>
+
+#include <decaf/lang/Thread.h>
+#include <decaf/lang/Pointer.h>
+#include <decaf/util/concurrent/atomic/AtomicInteger.h>
+
+using namespace std;
+using namespace cms;
+using namespace activemq;
+using namespace activemq::core;
+using namespace activemq::test;
+using namespace activemq::test::openwire;
+using namespace activemq::util;
+using namespace activemq::exceptions;
+using namespace decaf;
+using namespace decaf::lang;
+using namespace decaf::util;
+using namespace decaf::util::concurrent;
+using namespace decaf::util::concurrent::atomic;
+
+////////////////////////////////////////////////////////////////////////////////
+namespace {
+
+ class MyMessageListener : public cms::MessageListener {
+ private:
+
+ AtomicInteger counter;
+
+ public:
+
+ virtual ~MyMessageListener() {}
+
+ virtual void onMessage(const cms::Message* message) {
+ counter.incrementAndGet();
+ }
+
+ int getCounter() {
+ return counter.get();
+ }
+ };
+}
+
+////////////////////////////////////////////////////////////////////////////////
+OpenwireOptimizedAckTest::OpenwireOptimizedAckTest() {
+}
+
+////////////////////////////////////////////////////////////////////////////////
+OpenwireOptimizedAckTest::~OpenwireOptimizedAckTest() {
+}
+
+////////////////////////////////////////////////////////////////////////////////
+std::string OpenwireOptimizedAckTest::getBrokerURL() const {
+ return activemq::util::IntegrationCommon::getInstance().getOpenwireURL() +
+ "?connection.optimizeAcknowledge=true&cms.prefetchPolicy.all=100";
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void OpenwireOptimizedAckTest::testOptimizedAckSettings() {
+
+ Pointer<ActiveMQConnectionFactory> connectionFactory(
+ new ActiveMQConnectionFactory(getBrokerURL()));
+
+ connectionFactory->setOptimizeAcknowledgeTimeOut(500);
+ connectionFactory->setOptimizedAckScheduledAckInterval(1000);
+
+ CPPUNIT_ASSERT_EQUAL(100,
connectionFactory->getPrefetchPolicy()->getQueuePrefetch());
+
+ Pointer<Connection> connection(connectionFactory->createConnection());
+ connection->start();
+ Pointer<Session>
session(connection->createSession(Session::AUTO_ACKNOWLEDGE));
+ Pointer<Destination> destination(session->createQueue("TEST.FOO"));
+
+ Pointer<MessageConsumer>
consumer(session->createConsumer(destination.get()));
+
+ Pointer<ActiveMQConsumer> amqConsumer =
consumer.dynamicCast<ActiveMQConsumer>();
+ CPPUNIT_ASSERT(amqConsumer->isOptimizeAcknowledge());
+ CPPUNIT_ASSERT(amqConsumer->getOptimizedAckScheduledAckInterval() == 1000);
+
+ Pointer<MessageProducer>
producer(session->createProducer(destination.get()));
+ producer->setDeliveryMode(cms::DeliveryMode::NON_PERSISTENT);
+
+ std::string text = std::string() + "Hello world! From: " +
Thread::currentThread()->getName();
+ Pointer<TextMessage> message;
+
+ message.reset(session->createTextMessage(text));
+ producer->send(message.get());
+
+ Pointer<Message> received(consumer->receive(5000));
+ CPPUNIT_ASSERT(received != NULL);
+
+ Thread::sleep(1200);
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void OpenwireOptimizedAckTest::testOptimizedAckWithExpiredMsgs() {
+
+ Pointer<ActiveMQConnectionFactory> connectionFactory(
+ new ActiveMQConnectionFactory(getBrokerURL()));
+
+ Pointer<Connection> connection(connectionFactory->createConnection());
+ Pointer<Session>
session(connection->createSession(Session::AUTO_ACKNOWLEDGE));
+ Pointer<Destination> destination(session->createQueue("TEST.FOO"));
+
+ Pointer<MessageConsumer>
consumer(session->createConsumer(destination.get()));
+ MyMessageListener listener;
+ Pointer<MessageProducer>
producer(session->createProducer(destination.get()));
+ producer->setDeliveryMode(cms::DeliveryMode::NON_PERSISTENT);
+
+ std::string text = std::string() + "Hello world! From: " +
Thread::currentThread()->getName();
+ Pointer<TextMessage> message;
+
+ // Produce msgs that will expire quickly
+ for (int i=0; i<45; i++) {
+ message.reset(session->createTextMessage(text));
+ producer->send(message.get(), 1, 1, 400);
+ }
+
+ // Produce msgs that don't expire
+ for (int i=0; i<60; i++) {
+ message.reset(session->createTextMessage(text));
+ producer->send(message.get(), 1, 1, 60000);
+ }
+
+ consumer->setMessageListener(&listener);
+ Thread::sleep(1000); // let the batch of 45 expire.
+ connection->start();
+
+ int cycle = 0;
+ while (cycle++ < 20) {
+ if (listener.getCounter() == 60) {
+ break;
+ }
+ Thread::sleep(1000);
+ }
+
+ CPPUNIT_ASSERT_MESSAGE("Should have received 60 messages.",
listener.getCounter() == 60);
+
+ producer->close();
+ consumer->close();
+ session->close();
+ connection->close();
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void OpenwireOptimizedAckTest::testOptimizedAckWithExpiredMsgsSync() {
+
+ Pointer<ActiveMQConnectionFactory> connectionFactory(
+ new ActiveMQConnectionFactory(getBrokerURL()));
+
+ Pointer<Connection> connection(connectionFactory->createConnection());
+ connection->start();
+ Pointer<Session>
session(connection->createSession(Session::AUTO_ACKNOWLEDGE));
+ Pointer<Destination> destination(session->createQueue("TEST.FOO"));
+
+ Pointer<MessageConsumer>
consumer(session->createConsumer(destination.get()));
+ Pointer<MessageProducer>
producer(session->createProducer(destination.get()));
+ producer->setDeliveryMode(cms::DeliveryMode::NON_PERSISTENT);
+
+ std::string text = std::string() + "Hello world! From: " +
Thread::currentThread()->getName();
+ Pointer<TextMessage> message;
+
+ // Produce msgs that will expire quickly
+ for (int i=0; i<45; i++) {
+ message.reset(session->createTextMessage(text));
+ producer->send(message.get(), 1, 1, 10);
+ }
+
+ // Produce msgs that don't expire
+ for (int i=0; i<60; i++) {
+ message.reset(session->createTextMessage(text));
+ producer->send(message.get(), 1, 1, 30000);
+ }
+
+ Thread::sleep(200);
+
+ for (int counter = 1; counter <= 60; ++counter) {
+ Pointer<Message> message(consumer->receive(2000));
+ CPPUNIT_ASSERT(message != NULL);
+ }
+
+ producer->close();
+ consumer->close();
+ session->close();
+ connection->close();
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void OpenwireOptimizedAckTest::testOptimizedAckWithExpiredMsgsSync2() {
+
+ Pointer<ActiveMQConnectionFactory> connectionFactory(
+ new ActiveMQConnectionFactory(getBrokerURL()));
+
+ Pointer<Connection> connection(connectionFactory->createConnection());
+ connection->start();
+ Pointer<Session>
session(connection->createSession(Session::AUTO_ACKNOWLEDGE));
+ Pointer<Destination> destination(session->createQueue("TEST.FOO"));
+
+ Pointer<MessageConsumer>
consumer(session->createConsumer(destination.get()));
+ Pointer<MessageProducer>
producer(session->createProducer(destination.get()));
+ producer->setDeliveryMode(cms::DeliveryMode::NON_PERSISTENT);
+
+ std::string text = std::string() + "Hello world! From: " +
Thread::currentThread()->getName();
+ Pointer<TextMessage> message;
+
+ // Produce msgs that don't expire
+ for (int i=0; i<56; i++) {
+ message.reset(session->createTextMessage(text));
+ producer->send(message.get(), 1, 1, 30000);
+ }
+ // Produce msgs that will expire quickly
+ for (int i=0; i<44; i++) {
+ message.reset(session->createTextMessage(text));
+ producer->send(message.get(), 1, 1, 10);
+ }
+ // Produce some moremsgs that don't expire
+ for (int i=0; i<4; i++) {
+ message.reset(session->createTextMessage(text));
+ producer->send(message.get(), 1, 1, 30000);
+ }
+
+ Thread::sleep(200);
+
+ for (int counter = 1; counter <= 60; ++counter) {
+ Pointer<Message> message(consumer->receive(2000));
+ CPPUNIT_ASSERT(message != NULL);
+ }
+
+ producer->close();
+ consumer->close();
+ session->close();
+ connection->close();
+}
Propchange:
activemq/activemq-cpp/trunk/activemq-cpp/src/test-integration/activemq/test/openwire/OpenwireOptimizedAckTest.cpp
------------------------------------------------------------------------------
svn:eol-style = native
Added:
activemq/activemq-cpp/trunk/activemq-cpp/src/test-integration/activemq/test/openwire/OpenwireOptimizedAckTest.h
URL:
http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/test-integration/activemq/test/openwire/OpenwireOptimizedAckTest.h?rev=1464202&view=auto
==============================================================================
---
activemq/activemq-cpp/trunk/activemq-cpp/src/test-integration/activemq/test/openwire/OpenwireOptimizedAckTest.h
(added)
+++
activemq/activemq-cpp/trunk/activemq-cpp/src/test-integration/activemq/test/openwire/OpenwireOptimizedAckTest.h
Wed Apr 3 20:59:45 2013
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef _ACTIVEMQ_TEST_OPENWIRE_OPENWIREOPTIMIZEDACKTEST_H_
+#define _ACTIVEMQ_TEST_OPENWIRE_OPENWIREOPTIMIZEDACKTEST_H_
+
+#include <activemq/test/MessagePriorityTest.h>
+
+namespace activemq {
+namespace test {
+namespace openwire {
+
+ class OpenwireOptimizedAckTest : public MessagePriorityTest {
+
+ CPPUNIT_TEST_SUITE( OpenwireOptimizedAckTest );
+ CPPUNIT_TEST( testOptimizedAckSettings );
+ CPPUNIT_TEST( testOptimizedAckWithExpiredMsgs );
+ CPPUNIT_TEST( testOptimizedAckWithExpiredMsgsSync );
+ CPPUNIT_TEST( testOptimizedAckWithExpiredMsgsSync2 );
+ CPPUNIT_TEST_SUITE_END();
+
+ public:
+
+ OpenwireOptimizedAckTest();
+ virtual ~OpenwireOptimizedAckTest();
+
+ virtual std::string getBrokerURL() const;
+
+ void testOptimizedAckSettings();
+ void testOptimizedAckWithExpiredMsgs();
+ void testOptimizedAckWithExpiredMsgsSync();
+ void testOptimizedAckWithExpiredMsgsSync2();
+
+ };
+
+}}}
+
+#endif /* _ACTIVEMQ_TEST_OPENWIRE_OPENWIREOPTIMIZEDACKTEST_H_ */
Propchange:
activemq/activemq-cpp/trunk/activemq-cpp/src/test-integration/activemq/test/openwire/OpenwireOptimizedAckTest.h
------------------------------------------------------------------------------
svn:eol-style = native