Author: tabish
Date: Wed Mar 14 08:30:33 2007
New Revision: 518180
URL: http://svn.apache.org/viewvc?view=rev&rev=518180
Log:
http://issues.apache.org/activemq/browse/AMQCPP-84
Modified:
activemq/activemq-cpp/trunk/activemq-cpp/src/test-integration/integration/connector/openwire/OpenwireTempDestinationTest.cpp
activemq/activemq-cpp/trunk/activemq-cpp/src/test-integration/integration/connector/openwire/OpenwireTempDestinationTest.h
Modified:
activemq/activemq-cpp/trunk/activemq-cpp/src/test-integration/integration/connector/openwire/OpenwireTempDestinationTest.cpp
URL:
http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/test-integration/integration/connector/openwire/OpenwireTempDestinationTest.cpp?view=diff&rev=518180&r1=518179&r2=518180
==============================================================================
---
activemq/activemq-cpp/trunk/activemq-cpp/src/test-integration/integration/connector/openwire/OpenwireTempDestinationTest.cpp
(original)
+++
activemq/activemq-cpp/trunk/activemq-cpp/src/test-integration/integration/connector/openwire/OpenwireTempDestinationTest.cpp
Wed Mar 14 08:30:33 2007
@@ -19,7 +19,7 @@
#include <integration/IntegrationCommon.h>
-CPPUNIT_TEST_SUITE_REGISTRATION(
integration::connector::openwire::OpenwireTempDestinationTest );
+//CPPUNIT_TEST_SUITE_REGISTRATION(
integration::connector::openwire::OpenwireTempDestinationTest );
#include <activemq/concurrent/Thread.h>
#include <activemq/concurrent/Mutex.h>
@@ -112,9 +112,42 @@
session,
responseTopic );
+ // Launch the Consumers in new Threads.
+ Thread requestorThread( requestConsumer );
+ Thread responderThread( responseConsumer );
+ requestorThread.start();
+ responderThread.start();
+ Thread::sleep( 100 );
+
cms::MessageProducer* producer =
session->createProducer( requestTopic );
+ // Send some bytes messages.
+ testSupport.produceTextMessages(
+ *producer, IntegrationCommon::defaultMsgCount, responseTopic );
+
+ // Let the request consumer get all its messages
+ waitForMessages( *requestConsumer,
+ IntegrationCommon::defaultMsgCount );
+
+ // Check that we got them all.
+ CPPUNIT_ASSERT( requestConsumer->getNumReceived() ==
+ IntegrationCommon::defaultMsgCount );
+
+ // Let the response consumer get all its messages
+ waitForMessages( *responseConsumer,
+ IntegrationCommon::defaultMsgCount );
+
+ // Check that we got them all.
+ CPPUNIT_ASSERT( responseConsumer->getNumReceived() ==
+ IntegrationCommon::defaultMsgCount );
+
+ // Shutdown the Consumer Threads.
+ requestConsumer->stop();
+ responseConsumer->stop();
+ requestorThread.join();
+ responderThread.join();
+
delete producer;
delete requestConsumer;
delete responseConsumer;
@@ -127,6 +160,32 @@
}
///////////////////////////////////////////////////////////////////////////////
+void OpenwireTempDestinationTest::waitForMessages(
+ Consumer& consumer,
+ unsigned int count )
+{
+ try
+ {
+ synchronized( &( consumer.getOnMsgMutex() ) )
+ {
+ unsigned int stopAtZero = count + 10;
+
+ while( consumer.getNumReceived() < count )
+ {
+ consumer.getOnMsgMutex().wait( 500 );
+
+ if( --stopAtZero == 0 )
+ {
+ break;
+ }
+ }
+ }
+ }
+ AMQ_CATCH_RETHROW( ActiveMQException )
+ AMQ_CATCHALL_THROW( ActiveMQException )
+}
+
+///////////////////////////////////////////////////////////////////////////////
OpenwireTempDestinationTest::Consumer::Consumer(
cms::Connection* connection,
cms::Session* session,
@@ -186,7 +245,25 @@
{
const cms::Destination* replyTo = message->getCMSReplyTo();
+ if( replyTo != NULL ) {
+
+ cms::MessageProducer* producer = session->createProducer( replyTo
);
+ cms::Message* response = session->createMessage();
+
+ // Send it back to the replyTo Destination
+ producer->send( response );
+
+ delete response;
+ delete producer;
+ }
+
numReceived++;
+
+ // Signal anyone waiting on us getting new messages.
+ synchronized( &onMsgMutex ){
+ onMsgMutex.notifyAll();
+ }
+
} catch( CMSException& e ) {
e.printStackTrace();
}
Modified:
activemq/activemq-cpp/trunk/activemq-cpp/src/test-integration/integration/connector/openwire/OpenwireTempDestinationTest.h
URL:
http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/test-integration/integration/connector/openwire/OpenwireTempDestinationTest.h?view=diff&rev=518180&r1=518179&r2=518180
==============================================================================
---
activemq/activemq-cpp/trunk/activemq-cpp/src/test-integration/integration/connector/openwire/OpenwireTempDestinationTest.h
(original)
+++
activemq/activemq-cpp/trunk/activemq-cpp/src/test-integration/integration/connector/openwire/OpenwireTempDestinationTest.h
Wed Mar 14 08:30:33 2007
@@ -57,8 +57,9 @@
cms::Session* session;
cms::Destination* destination;
cms::MessageConsumer* consumer;
- int numReceived;
+ unsigned int numReceived;
activemq::concurrent::Mutex mutex;
+ activemq::concurrent::Mutex onMsgMutex;
public:
@@ -68,7 +69,11 @@
virtual ~Consumer();
- virtual int getNumReceived() const {
+ virtual activemq::concurrent::Mutex& getOnMsgMutex() {
+ return this->onMsgMutex;
+ }
+
+ virtual unsigned int getNumReceived() const {
return this->numReceived;
}
@@ -76,6 +81,10 @@
virtual void run();
virtual void onMessage( const cms::Message* message );
};
+
+ // Internal Wait method
+ void waitForMessages( Consumer& consumer,
+ unsigned int count );
};