Author: tabish
Date: Mon Nov 30 22:13:17 2009
New Revision: 885594
URL: http://svn.apache.org/viewvc?rev=885594&view=rev
Log:
Adds additional tests to the Individual Acknowledge tests
Modified:
activemq/activemq-cpp/trunk/activemq-cpp/src/test-integration/activemq/test/openwire/OpenwireIndividualAckTest.cpp
activemq/activemq-cpp/trunk/activemq-cpp/src/test-integration/activemq/test/openwire/OpenwireIndividualAckTest.h
Modified:
activemq/activemq-cpp/trunk/activemq-cpp/src/test-integration/activemq/test/openwire/OpenwireIndividualAckTest.cpp
URL:
http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/test-integration/activemq/test/openwire/OpenwireIndividualAckTest.cpp?rev=885594&r1=885593&r2=885594&view=diff
==============================================================================
---
activemq/activemq-cpp/trunk/activemq-cpp/src/test-integration/activemq/test/openwire/OpenwireIndividualAckTest.cpp
(original)
+++
activemq/activemq-cpp/trunk/activemq-cpp/src/test-integration/activemq/test/openwire/OpenwireIndividualAckTest.cpp
Mon Nov 30 22:13:17 2009
@@ -146,3 +146,143 @@
session->close();
}
+
+////////////////////////////////////////////////////////////////////////////////
+void
OpenwireIndividualAckTest::testIndividualAcknowledgeMultiMessages_AcknowledgeFirstTest()
+{
+ Connection* connection = this->cmsProvider->getConnection();
+ connection->start();
+
+ std::auto_ptr<Session> session( connection->createSession(
cms::Session::INDIVIDUAL_ACKNOWLEDGE ) );
+ std::auto_ptr<Destination> queue( session->createTemporaryQueue() );
+ std::auto_ptr<MessageProducer> producer( session->createProducer(
queue.get() ) );
+
+ std::auto_ptr<TextMessage> msg1( session->createTextMessage("test 1") );
+ producer->send( msg1.get() );
+ std::auto_ptr<TextMessage> msg2( session->createTextMessage("test 2") );
+ producer->send( msg2.get() );
+
+ producer->close();
+
+ std::auto_ptr<MessageConsumer> consumer( session->createConsumer(
queue.get() ) );
+
+ // Read the first message
+ std::auto_ptr<Message> recvMsg1( consumer->receive( 2000 ) );
+ CPPUNIT_ASSERT( recvMsg1.get() != NULL );
+ CPPUNIT_ASSERT( msg1->getText() == dynamic_cast<TextMessage*>(
recvMsg1.get() )->getText() );
+
+ // Read the second message
+ std::auto_ptr<Message> recvMsg2( consumer->receive( 2000 ) );
+ CPPUNIT_ASSERT( recvMsg2.get() != NULL );
+ CPPUNIT_ASSERT( msg2->getText() == dynamic_cast<TextMessage*>(
recvMsg2.get() )->getText() );
+
+ // Acknowledge first message
+ recvMsg1->acknowledge();
+
+ consumer->close();
+
+ // Read first message a second time
+ consumer.reset( session->createConsumer( queue.get() ) );
+ std::auto_ptr<Message> recvMsg3( consumer->receive( 2000 ) );
+ CPPUNIT_ASSERT( recvMsg3.get() != NULL );
+ CPPUNIT_ASSERT( msg2->getText() == dynamic_cast<TextMessage*>(
recvMsg3.get() )->getText() );
+
+ // Try to read second message a second time
+ std::auto_ptr<Message> recvMsg4( consumer->receive( 2000 ) );
+ CPPUNIT_ASSERT( recvMsg4.get() == NULL );
+
+ consumer->close();
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void OpenwireIndividualAckTest::testManyMessageAckedAfterMessageConsumption()
+{
+ int messageCount = 20;
+ std::auto_ptr<Message> msg;
+
+ Connection* connection = this->cmsProvider->getConnection();
+ connection->start();
+
+ std::auto_ptr<Session> session( connection->createSession(
cms::Session::INDIVIDUAL_ACKNOWLEDGE ) );
+ std::auto_ptr<Destination> queue( session->createTemporaryQueue() );
+ std::auto_ptr<MessageProducer> producer( session->createProducer(
queue.get() ) );
+
+ for( int i = 0; i < messageCount; i++ )
+ {
+ msg.reset( session->createTextMessage("msg") );
+ producer->send( msg.get() );
+ }
+
+ // Consume the message...
+ std::auto_ptr<MessageConsumer> consumer( session->createConsumer(
queue.get() ) );
+
+ for( int i = 0; i < messageCount; i++ )
+ {
+ msg.reset( consumer->receive( 1000 ) );
+ CPPUNIT_ASSERT( msg.get() != NULL );
+ msg->acknowledge();
+ }
+
+ msg.reset( consumer->receive( 1000 ) );
+ CPPUNIT_ASSERT( msg.get() == NULL );
+
+ // Reset the session.
+ session->close();
+ session.reset( connection->createSession( cms::Session::AUTO_ACKNOWLEDGE )
);
+
+ // Attempt to Consume the message...
+ consumer.reset( session->createConsumer( queue.get() ) );
+ msg.reset( consumer->receive( 1000 ) );
+ CPPUNIT_ASSERT( msg.get() == NULL );
+ session->close();
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void OpenwireIndividualAckTest::testManyMessageAckedAfterAllConsumption()
+{
+ int messageCount = 20;
+ std::auto_ptr<Message> msg;
+
+ Connection* connection = this->cmsProvider->getConnection();
+ connection->start();
+
+ std::auto_ptr<Session> session( connection->createSession(
cms::Session::INDIVIDUAL_ACKNOWLEDGE ) );
+ std::auto_ptr<Destination> queue( session->createTemporaryQueue() );
+ std::auto_ptr<MessageProducer> producer( session->createProducer(
queue.get() ) );
+
+ for( int i = 0; i < messageCount; i++ )
+ {
+ msg.reset( session->createTextMessage("msg") );
+ producer->send( msg.get() );
+ }
+
+ // Consume the message...
+ std::auto_ptr<MessageConsumer> consumer( session->createConsumer(
queue.get() ) );
+
+ std::vector<Message*> consumedMessages;
+
+ for( int i = 0; i < messageCount; i++ )
+ {
+ Message* message = consumer->receive( 1000 );
+ CPPUNIT_ASSERT( message != NULL );
+ consumedMessages.push_back( message );
+ }
+
+ for( int i = 0; i < messageCount; i++ )
+ {
+ consumedMessages[i]->acknowledge();
+ delete consumedMessages[i];
+ }
+ msg.reset( consumer->receive( 1000 ) );
+ CPPUNIT_ASSERT( msg.get() == NULL );
+
+ // Reset the session.
+ session->close();
+ session.reset( connection->createSession( cms::Session::AUTO_ACKNOWLEDGE )
);
+
+ // Attempt to Consume the message...
+ consumer.reset( session->createConsumer( queue.get() ) );
+ msg.reset( consumer->receive( 1000 ) );
+ CPPUNIT_ASSERT( msg.get() == NULL );
+ session->close();
+}
Modified:
activemq/activemq-cpp/trunk/activemq-cpp/src/test-integration/activemq/test/openwire/OpenwireIndividualAckTest.h
URL:
http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/test-integration/activemq/test/openwire/OpenwireIndividualAckTest.h?rev=885594&r1=885593&r2=885594&view=diff
==============================================================================
---
activemq/activemq-cpp/trunk/activemq-cpp/src/test-integration/activemq/test/openwire/OpenwireIndividualAckTest.h
(original)
+++
activemq/activemq-cpp/trunk/activemq-cpp/src/test-integration/activemq/test/openwire/OpenwireIndividualAckTest.h
Mon Nov 30 22:13:17 2009
@@ -31,6 +31,9 @@
CPPUNIT_TEST( testAckedMessageAreConsumed );
CPPUNIT_TEST( testLastMessageAcked );
CPPUNIT_TEST( testUnAckedMessageAreNotConsumedOnSessionClose );
+ CPPUNIT_TEST(
testIndividualAcknowledgeMultiMessages_AcknowledgeFirstTest );
+ CPPUNIT_TEST( testManyMessageAckedAfterMessageConsumption );
+ CPPUNIT_TEST( testManyMessageAckedAfterAllConsumption );
CPPUNIT_TEST_SUITE_END();
public:
@@ -41,6 +44,9 @@
void testAckedMessageAreConsumed();
void testLastMessageAcked();
void testUnAckedMessageAreNotConsumedOnSessionClose();
+ void testIndividualAcknowledgeMultiMessages_AcknowledgeFirstTest();
+ void testManyMessageAckedAfterMessageConsumption();
+ void testManyMessageAckedAfterAllConsumption();
virtual std::string getBrokerURL() const {
return
activemq::util::IntegrationCommon::getInstance().getOpenwireURL();