Author: nmittler Date: Sat Dec 2 05:39:33 2006 New Revision: 481559 URL: http://svn.apache.org/viewvc?view=rev&rev=481559 Log: [AMQCPP-14] Adding integration test for expiration
Added: incubator/activemq/activemq-cpp/trunk/activemq-cpp/src/test-integration/integration/expiration/ incubator/activemq/activemq-cpp/trunk/activemq-cpp/src/test-integration/integration/expiration/ExpirationTest.cpp incubator/activemq/activemq-cpp/trunk/activemq-cpp/src/test-integration/integration/expiration/ExpirationTest.h Added: incubator/activemq/activemq-cpp/trunk/activemq-cpp/src/test-integration/integration/expiration/ExpirationTest.cpp URL: http://svn.apache.org/viewvc/incubator/activemq/activemq-cpp/trunk/activemq-cpp/src/test-integration/integration/expiration/ExpirationTest.cpp?view=auto&rev=481559 ============================================================================== --- incubator/activemq/activemq-cpp/trunk/activemq-cpp/src/test-integration/integration/expiration/ExpirationTest.cpp (added) +++ incubator/activemq/activemq-cpp/trunk/activemq-cpp/src/test-integration/integration/expiration/ExpirationTest.cpp Sat Dec 2 05:39:33 2006 @@ -0,0 +1,321 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "ExpirationTest.h" + +#include <integration/common/IntegrationCommon.h> + +//CPPUNIT_TEST_SUITE_REGISTRATION( integration::expiration::ExpirationTest ); + +#include <sstream> + +#include <activemq/core/ActiveMQConnectionFactory.h> +#include <activemq/exceptions/ActiveMQException.h> +#include <activemq/concurrent/Thread.h> +#include <activemq/connector/stomp/StompConnector.h> +#include <activemq/util/SimpleProperties.h> +#include <activemq/transport/TransportFactory.h> +#include <activemq/util/Guid.h> +#include <activemq/util/SimpleProperties.h> +#include <activemq/util/StringTokenizer.h> +#include <activemq/connector/ConnectorFactoryMap.h> +#include <activemq/network/SocketFactory.h> +#include <activemq/transport/TransportFactory.h> +#include <activemq/network/Socket.h> +#include <activemq/exceptions/NullPointerException.h> +#include <activemq/core/ActiveMQConnection.h> +#include <activemq/core/ActiveMQConsumer.h> +#include <activemq/core/ActiveMQProducer.h> +#include <activemq/util/StringTokenizer.h> +#include <activemq/util/Boolean.h> + +#include <cms/Connection.h> +#include <cms/MessageConsumer.h> +#include <cms/MessageProducer.h> +#include <cms/MessageListener.h> +#include <cms/Startable.h> +#include <cms/Closeable.h> +#include <cms/MessageListener.h> +#include <cms/ExceptionListener.h> +#include <cms/Topic.h> +#include <cms/Queue.h> +#include <cms/TemporaryTopic.h> +#include <cms/TemporaryQueue.h> +#include <cms/Session.h> +#include <cms/BytesMessage.h> +#include <cms/TextMessage.h> +#include <cms/MapMessage.h> +#include <cms/Session.h> + +using namespace activemq::connector::stomp; +using namespace activemq::transport; +using namespace activemq::util; +using namespace std; +using namespace cms; +using namespace activemq; +using namespace activemq::core; +using namespace activemq::util; +using namespace activemq::connector; +using namespace activemq::exceptions; +using namespace activemq::network; +using namespace activemq::transport; +using namespace activemq::concurrent; + +using namespace std; +using namespace integration; +using namespace integration::expiration; +using namespace integration::common; + +class Producer : public Runnable { +private: + + Connection* connection; + Session* session; + Topic* destination; + MessageProducer* producer; + int numMessages; + long long timeToLive; + +public: + + Producer( int numMessages, long long timeToLive ){ + connection = NULL; + session = NULL; + destination = NULL; + producer = NULL; + this->numMessages = numMessages; + this->timeToLive = timeToLive; + } + + virtual ~Producer(){ + cleanup(); + } + + virtual void run() { + try { + // Create a ConnectionFactory + ActiveMQConnectionFactory* connectionFactory = new + ActiveMQConnectionFactory("tcp://localhost:61613"); + + // Create a Connection + connection = connectionFactory->createConnection(); + connection->start(); + + string sss=connection->getClientId(); + cout << sss << endl; + + session = connection->createSession( Session::AUTO_ACKNOWLEDGE); + destination = session->createTopic( "mytopic" ); + + producer = session->createProducer( destination ); + producer->setDeliveryMode( DeliveryMode::PERSISTANT ); + + //unsigned long ttt=getcurt(); + producer->setTimeToLive( 1); + + // Create the Thread Id String + string threadIdStr = Integer::toString( Thread::getId() ); + + // Create a messages + string text = (string)"Hello world! from thread " + threadIdStr; + + for( int ix=0; ix<numMessages; ++ix ){ + TextMessage* message = session->createTextMessage( text ); + producer->send( message ); + delete message; + } + + }catch ( CMSException& e ) { + e.printStackTrace(); + } + } + +private: + + void cleanup(){ + + // Destroy resources. + try{ + if( destination != NULL ) delete destination; + }catch ( CMSException& e ) {} + destination = NULL; + + try{ + if( producer != NULL ) delete producer; + }catch ( CMSException& e ) {} + producer = NULL; + + // Close open resources. + try{ + if( session != NULL ) session->close(); + if( connection != NULL ) connection->close(); + }catch ( CMSException& e ) {} + + try{ + if( session != NULL ) delete session; + }catch ( CMSException& e ) {} + session = NULL; + + try{ + if( connection != NULL ) delete connection; + }catch ( CMSException& e ) {} + connection = NULL; + } +}; + +class Consumer : public MessageListener, public Runnable { + +private: + + Connection* connection; + Session* session; + Topic* destination; + MessageConsumer* consumer; + long waitMillis; + int numReceived; + +public: + + Consumer( long waitMillis ){ + connection = NULL; + session = NULL; + destination = NULL; + consumer = NULL; + this->waitMillis = waitMillis; + numReceived = 0; + } + + virtual ~Consumer(){ + cleanup(); + } + + virtual int getNumReceived() const{ + return numReceived; + } + + virtual void run() { + + try { + + string user,passwd,sID; + user="default"; + passwd=""; + sID="lsgID"; + + // Create a ConnectionFactory + ActiveMQConnectionFactory* connectionFactory = + new ActiveMQConnectionFactory("tcp://localhost:61613",user,passwd,sID); + + // Create a Connection + connection = connectionFactory->createConnection(); + delete connectionFactory; + connection->start(); + + // Create a Session + session = connection->createSession( Session::AUTO_ACKNOWLEDGE); + + // Create the destination (Topic or Queue) + destination = session->createTopic( "mytopic?consumer.retroactive=true"); + + consumer = session->createDurableConsumer( destination , user ,"",false); + + consumer->setMessageListener( this ); + + // Sleep while asynchronous messages come in. + Thread::sleep( waitMillis ); + + } catch (CMSException& e) { + e.printStackTrace(); + } + } + + virtual void onMessage( const Message* message ){ + + try + { + const TextMessage* textMessage = + dynamic_cast< const TextMessage* >( message ); + string text = textMessage->getText(); + numReceived++; + } catch (CMSException& e) { + e.printStackTrace(); + } + } + +private: + + void cleanup(){ + + // Destroy resources. + try{ + if( destination != NULL ) delete destination; + }catch (CMSException& e) {} + destination = NULL; + + try{ + if( consumer != NULL ) delete consumer; + }catch (CMSException& e) {} + consumer = NULL; + + // Close open resources. + try{ + if( session != NULL ) session->close(); + if( connection != NULL ) connection->close(); + }catch (CMSException& e) {} + + try{ + if( session != NULL ) delete session; + }catch (CMSException& e) {} + session = NULL; + + try{ + if( connection != NULL ) delete connection; + }catch (CMSException& e) {} + connection = NULL; + } +}; + +void ExpirationTest::testExpired() +{ + Producer producer( 2, 1 ); + Thread producerThread( &producer ); + producerThread.start(); + producerThread.join(); + + Consumer consumer( 2000 ); + Thread consumerThread( &consumer ); + consumerThread.start(); + consumerThread.join(); + + CPPUNIT_ASSERT( consumer.getNumReceived() == 0 ); +} + +void ExpirationTest::testNotExpired() +{ + Producer producer( 2, 2000 ); + Thread producerThread( &producer ); + producerThread.start(); + producerThread.join(); + + Consumer consumer( 2000 ); + Thread consumerThread( &consumer ); + consumerThread.start(); + consumerThread.join(); + + CPPUNIT_ASSERT( consumer.getNumReceived() == 2 ); +} + Added: incubator/activemq/activemq-cpp/trunk/activemq-cpp/src/test-integration/integration/expiration/ExpirationTest.h URL: http://svn.apache.org/viewvc/incubator/activemq/activemq-cpp/trunk/activemq-cpp/src/test-integration/integration/expiration/ExpirationTest.h?view=auto&rev=481559 ============================================================================== --- incubator/activemq/activemq-cpp/trunk/activemq-cpp/src/test-integration/integration/expiration/ExpirationTest.h (added) +++ incubator/activemq/activemq-cpp/trunk/activemq-cpp/src/test-integration/integration/expiration/ExpirationTest.h Sat Dec 2 05:39:33 2006 @@ -0,0 +1,54 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#ifndef _INTEGRATION_EXPIRATION_EXPIRATIONTEST_H_ +#define _INTEGRATION_EXPIRATION_EXPIRATIONTEST_H_ + +#include <cppunit/TestFixture.h> +#include <cppunit/extensions/HelperMacros.h> + +#include <activemq/concurrent/Mutex.h> + +#include <cms/MessageListener.h> +#include <cms/ExceptionListener.h> +#include <cms/ConnectionFactory.h> +#include <cms/Connection.h> +#include <cms/Session.h> +#include <cms/MessageProducer.h> + +namespace integration{ +namespace expiration{ + + class ExpirationTest : public CppUnit::TestFixture + { + CPPUNIT_TEST_SUITE( ExpirationTest ); + //CPPUNIT_TEST( testExpired ); + //CPPUNIT_TEST( testNotExpired ); + CPPUNIT_TEST_SUITE_END(); + + public: + + ExpirationTest(){} + virtual ~ExpirationTest(){} + + virtual void testExpired(); + virtual void testNotExpired(); + }; + +}} + +#endif /*_INTEGRATION_EXPIRATION_EXPIRATIONTEST_H_*/