Author: tabish
Date: Fri Mar 13 19:18:53 2009
New Revision: 753355
URL: http://svn.apache.org/viewvc?rev=753355&view=rev
Log:
http://issues.apache.org/activemq/browse/AMQCPP-100
Add a test to check that all messages are sent when a transport fails during a
send. Fix source code generator to add in missing isMessage method in
Message.h. Make the FailoverTransport pass on a call to Narrow to a connected
Transport if there is one.
Modified:
activemq/activemq-cpp/trunk/activemq-cpp-openwire-generator/src/main/java/org/apache/activemq/openwire/tool/commands/CommandCodeGeneratorsFactory.java
activemq/activemq-cpp/trunk/src/main/activemq/commands/Message.h
activemq/activemq-cpp/trunk/src/main/activemq/transport/failover/FailoverTransport.h
activemq/activemq-cpp/trunk/src/test/activemq/transport/failover/FailoverTransportTest.cpp
activemq/activemq-cpp/trunk/src/test/activemq/transport/failover/FailoverTransportTest.h
Modified:
activemq/activemq-cpp/trunk/activemq-cpp-openwire-generator/src/main/java/org/apache/activemq/openwire/tool/commands/CommandCodeGeneratorsFactory.java
URL:
http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp-openwire-generator/src/main/java/org/apache/activemq/openwire/tool/commands/CommandCodeGeneratorsFactory.java?rev=753355&r1=753354&r2=753355&view=diff
==============================================================================
---
activemq/activemq-cpp/trunk/activemq-cpp-openwire-generator/src/main/java/org/apache/activemq/openwire/tool/commands/CommandCodeGeneratorsFactory.java
(original)
+++
activemq/activemq-cpp/trunk/activemq-cpp-openwire-generator/src/main/java/org/apache/activemq/openwire/tool/commands/CommandCodeGeneratorsFactory.java
Fri Mar 13 19:18:53 2009
@@ -43,6 +43,7 @@
commandsWithShortcuts.add( "BrokerInfo" );
commandsWithShortcuts.add( "KeepAliveInfo" );
commandsWithShortcuts.add( "WireFormatInfo" );
+ commandsWithShortcuts.add( "Message" );
commandsWithShortcuts.add( "MessageAck" );
commandsWithShortcuts.add( "ProducerAck" );
commandsWithShortcuts.add( "MessageDispatchNotification" );
Modified: activemq/activemq-cpp/trunk/src/main/activemq/commands/Message.h
URL:
http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/src/main/activemq/commands/Message.h?rev=753355&r1=753354&r2=753355&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/src/main/activemq/commands/Message.h (original)
+++ activemq/activemq-cpp/trunk/src/main/activemq/commands/Message.h Fri Mar 13
19:18:53 2009
@@ -373,6 +373,13 @@
virtual void setBrokerOutTime( long long brokerOutTime );
/**
+ * @return an answer of true to the isMessage() query.
+ */
+ virtual bool isMessage() const {
+ return true;
+ }
+
+ /**
* Allows a Visitor to visit this command and return a response to the
* command based on the command type being visited. The command will
call
* the proper processXXX method in the visitor.
Modified:
activemq/activemq-cpp/trunk/src/main/activemq/transport/failover/FailoverTransport.h
URL:
http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/src/main/activemq/transport/failover/FailoverTransport.h?rev=753355&r1=753354&r2=753355&view=diff
==============================================================================
---
activemq/activemq-cpp/trunk/src/main/activemq/transport/failover/FailoverTransport.h
(original)
+++
activemq/activemq-cpp/trunk/src/main/activemq/transport/failover/FailoverTransport.h
Fri Mar 13 19:18:53 2009
@@ -266,6 +266,10 @@
return this;
}
+ if( this->connectedTransport != NULL ) {
+ return this->connectedTransport->narrow( typeId );
+ }
+
return NULL;
}
Modified:
activemq/activemq-cpp/trunk/src/test/activemq/transport/failover/FailoverTransportTest.cpp
URL:
http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/src/test/activemq/transport/failover/FailoverTransportTest.cpp?rev=753355&r1=753354&r2=753355&view=diff
==============================================================================
---
activemq/activemq-cpp/trunk/src/test/activemq/transport/failover/FailoverTransportTest.cpp
(original)
+++
activemq/activemq-cpp/trunk/src/test/activemq/transport/failover/FailoverTransportTest.cpp
Fri Mar 13 19:18:53 2009
@@ -19,13 +19,17 @@
#include <activemq/transport/failover/FailoverTransportFactory.h>
#include <activemq/transport/failover/FailoverTransport.h>
+#include <activemq/transport/mock/MockTransport.h>
#include <activemq/exceptions/ActiveMQException.h>
+#include <activemq/commands/ActiveMQMessage.h>
#include <decaf/lang/Pointer.h>
#include <decaf/lang/Thread.h>
using namespace activemq;
+using namespace activemq::commands;
using namespace activemq::transport;
using namespace activemq::transport::failover;
+using namespace activemq::transport::mock;
using namespace activemq::exceptions;
using namespace decaf::lang;
using namespace decaf::util;
@@ -152,3 +156,98 @@
transport->start();
transport->close();
}
+
+////////////////////////////////////////////////////////////////////////////////
+class MessageCountingListener : public DefaultTransportListener {
+public:
+
+ int numMessages;
+
+ MessageCountingListener() : numMessages( 0 ) {}
+
+ virtual void onCommand( const Pointer<Command>& command AMQCPP_UNUSED ) {
+ numMessages++;
+ }
+};
+
+////////////////////////////////////////////////////////////////////////////////
+void FailoverTransportTest::testSendOnewayMessage() {
+
+ std::string uri = "failover://(mock://localhost:61616)?randomize=false";
+
+ Pointer<ActiveMQMessage> message( new ActiveMQMessage() );
+
+ MessageCountingListener messageCounter;
+ DefaultTransportListener listener;
+ FailoverTransportFactory factory;
+
+ Pointer<Transport> transport( factory.create( uri ) );
+ CPPUNIT_ASSERT( transport != NULL );
+ transport->setTransportListener( &listener );
+
+ FailoverTransport* failover = dynamic_cast<FailoverTransport*>(
+ transport->narrow( typeid( FailoverTransport ) ) );
+
+ CPPUNIT_ASSERT( failover != NULL );
+ CPPUNIT_ASSERT( failover->isRandomize() == false );
+
+ transport->start();
+
+ MockTransport* mock = NULL;
+ while( mock == NULL ) {
+ mock = dynamic_cast<MockTransport*>( transport->narrow( typeid(
MockTransport ) ) );
+ }
+ mock->setOutgoingListener( &messageCounter );
+
+ transport->oneway( message );
+ transport->oneway( message );
+ transport->oneway( message );
+ transport->oneway( message );
+ Thread::sleep( 1000 );
+
+ CPPUNIT_ASSERT( messageCounter.numMessages = 4 );
+
+ transport->close();
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void FailoverTransportTest::testSendOnewayMessageFail() {
+
+ std::string uri =
+ "failover://(mock://localhost:61616?failOnSendMessage=true,"
+ "mock://localhost:61618)?randomize=false";
+
+ Pointer<ActiveMQMessage> message( new ActiveMQMessage() );
+
+ MessageCountingListener messageCounter;
+ DefaultTransportListener listener;
+ FailoverTransportFactory factory;
+
+ Pointer<Transport> transport( factory.create( uri ) );
+ CPPUNIT_ASSERT( transport != NULL );
+ transport->setTransportListener( &listener );
+
+ FailoverTransport* failover = dynamic_cast<FailoverTransport*>(
+ transport->narrow( typeid( FailoverTransport ) ) );
+
+ CPPUNIT_ASSERT( failover != NULL );
+ CPPUNIT_ASSERT( failover->isRandomize() == false );
+
+ transport->start();
+
+ MockTransport* mock = NULL;
+ while( mock == NULL ) {
+ mock = dynamic_cast<MockTransport*>( transport->narrow( typeid(
MockTransport ) ) );
+ }
+ mock->setOutgoingListener( &messageCounter );
+
+ transport->oneway( message );
+ transport->oneway( message );
+ transport->oneway( message );
+ transport->oneway( message );
+ Thread::sleep( 1000 );
+
+ CPPUNIT_ASSERT( messageCounter.numMessages = 4 );
+
+ transport->close();
+}
Modified:
activemq/activemq-cpp/trunk/src/test/activemq/transport/failover/FailoverTransportTest.h
URL:
http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/src/test/activemq/transport/failover/FailoverTransportTest.h?rev=753355&r1=753354&r2=753355&view=diff
==============================================================================
---
activemq/activemq-cpp/trunk/src/test/activemq/transport/failover/FailoverTransportTest.h
(original)
+++
activemq/activemq-cpp/trunk/src/test/activemq/transport/failover/FailoverTransportTest.h
Fri Mar 13 19:18:53 2009
@@ -33,6 +33,8 @@
CPPUNIT_TEST( testTransportCreateWithBackups );
CPPUNIT_TEST( testTransportCreateFailOnCreate );
CPPUNIT_TEST( testFailingBackupCreation );
+ CPPUNIT_TEST( testSendOnewayMessage );
+ CPPUNIT_TEST( testSendOnewayMessageFail );
CPPUNIT_TEST_SUITE_END();
public:
@@ -56,6 +58,13 @@
// transports won't segfault and can be started, and stopped without
error.
void testFailingBackupCreation();
+ // Test that messages sent via the Oneway method are received.
+ void testSendOnewayMessage();
+
+ // Test that messages sent via the Oneway method are received after
the first
+ // transport faults on the send and transport 2 is created.
+ void testSendOnewayMessageFail();
+
};
}}}