Repository: activemq-cpp Updated Branches: refs/heads/master ace7a74cc -> 78172b2a1
https://issues.apache.org/jira/browse/AMQCPP-581 Ensure a pull request gets resent when the incoming message will be discard due to max redelivery exceeded. Project: http://git-wip-us.apache.org/repos/asf/activemq-cpp/repo Commit: http://git-wip-us.apache.org/repos/asf/activemq-cpp/commit/78172b2a Tree: http://git-wip-us.apache.org/repos/asf/activemq-cpp/tree/78172b2a Diff: http://git-wip-us.apache.org/repos/asf/activemq-cpp/diff/78172b2a Branch: refs/heads/master Commit: 78172b2a11b17676e6417a4a52299444208726bf Parents: ace7a74 Author: Timothy Bish <[email protected]> Authored: Thu Jul 30 18:27:24 2015 -0400 Committer: Timothy Bish <[email protected]> Committed: Thu Jul 30 18:27:24 2015 -0400 ---------------------------------------------------------------------- .../core/kernels/ActiveMQConsumerKernel.cpp | 12 +- .../test/openwire/OpenwireSimpleTest.cpp | 374 +++++++++++-------- .../activemq/test/openwire/OpenwireSimpleTest.h | 2 + 3 files changed, 221 insertions(+), 167 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/activemq-cpp/blob/78172b2a/activemq-cpp/src/main/activemq/core/kernels/ActiveMQConsumerKernel.cpp ---------------------------------------------------------------------- diff --git a/activemq-cpp/src/main/activemq/core/kernels/ActiveMQConsumerKernel.cpp b/activemq-cpp/src/main/activemq/core/kernels/ActiveMQConsumerKernel.cpp index 57b7110..f97148e 100644 --- a/activemq-cpp/src/main/activemq/core/kernels/ActiveMQConsumerKernel.cpp +++ b/activemq-cpp/src/main/activemq/core/kernels/ActiveMQConsumerKernel.cpp @@ -1024,17 +1024,19 @@ decaf::lang::Pointer<MessageDispatch> ActiveMQConsumerKernel::dequeue(long long if (timeout > 0) { timeout = Math::max(deadline - System::currentTimeMillis(), 0LL); } - - continue; } else if (internal->redeliveryExceeded(dispatch)) { internal->posionAck(dispatch, "dispatch to " + getConsumerId()->toString() + " exceeds RedeliveryPolicy limit: " + Integer::toString(internal->redeliveryPolicy->getMaximumRedeliveries())); - } + if (timeout > 0) { + timeout = Math::max(deadline - System::currentTimeMillis(), 0LL); + } - // Return the message. - return dispatch; + sendPullRequest(timeout); + } else { + return dispatch; + } } return Pointer<MessageDispatch>(); http://git-wip-us.apache.org/repos/asf/activemq-cpp/blob/78172b2a/activemq-cpp/src/test-integration/activemq/test/openwire/OpenwireSimpleTest.cpp ---------------------------------------------------------------------- diff --git a/activemq-cpp/src/test-integration/activemq/test/openwire/OpenwireSimpleTest.cpp b/activemq-cpp/src/test-integration/activemq/test/openwire/OpenwireSimpleTest.cpp index 34ab42f..c149dc9 100644 --- a/activemq-cpp/src/test-integration/activemq/test/openwire/OpenwireSimpleTest.cpp +++ b/activemq-cpp/src/test-integration/activemq/test/openwire/OpenwireSimpleTest.cpp @@ -18,6 +18,7 @@ #include "OpenwireSimpleTest.h" #include <activemq/util/CMSListener.h> +#include <activemq/core/ActiveMQConnectionFactory.h> #include <activemq/core/ActiveMQConnection.h> #include <activemq/core/PrefetchPolicy.h> #include <activemq/exceptions/ActiveMQException.h> @@ -46,71 +47,69 @@ OpenwireSimpleTest::~OpenwireSimpleTest() { //////////////////////////////////////////////////////////////////////////////// void OpenwireSimpleTest::testWithZeroConsumerPrefetch() { - cmsProvider->setTopic( false ); - cmsProvider->setDestinationName( - UUID::randomUUID().toString() + "?consumer.prefetchSize=0" ); + cmsProvider->setTopic(false); + cmsProvider->setDestinationName(UUID::randomUUID().toString() + "?consumer.prefetchSize=0"); cmsProvider->reconnectSession(); // Create CMS Object for Comms - cms::Session* session( cmsProvider->getSession() ); + cms::Session* session(cmsProvider->getSession()); cms::MessageConsumer* consumer = cmsProvider->getConsumer(); cms::MessageProducer* producer = cmsProvider->getProducer(); - producer->setDeliveryMode( DeliveryMode::NON_PERSISTENT ); + producer->setDeliveryMode(DeliveryMode::NON_PERSISTENT); - auto_ptr<cms::TextMessage> txtMessage( session->createTextMessage( "TEST MESSAGE" ) ); + auto_ptr<cms::TextMessage> txtMessage(session->createTextMessage("TEST MESSAGE")); // Send some text messages - producer->send( txtMessage.get() ); + producer->send(txtMessage.get()); - auto_ptr<cms::Message> message( consumer->receive( 1000 ) ); - CPPUNIT_ASSERT( message.get() != NULL ); + auto_ptr<cms::Message> message(consumer->receive(1000)); + CPPUNIT_ASSERT(message.get() != NULL); } //////////////////////////////////////////////////////////////////////////////// void OpenwireSimpleTest::testWithZeroConsumerPrefetch2() { - cmsProvider->setTopic( false ); + cmsProvider->setTopic(false); ActiveMQConnection* amqConnection = dynamic_cast<ActiveMQConnection*>(cmsProvider->getConnection()); amqConnection->getPrefetchPolicy()->setQueuePrefetch(0); amqConnection->getPrefetchPolicy()->setTopicPrefetch(0); cmsProvider->reconnectSession(); // Create CMS Object for Comms - cms::Session* session( cmsProvider->getSession() ); + cms::Session* session(cmsProvider->getSession()); cms::MessageConsumer* consumer = cmsProvider->getConsumer(); cms::MessageProducer* producer = cmsProvider->getProducer(); - producer->setDeliveryMode( DeliveryMode::NON_PERSISTENT ); + producer->setDeliveryMode(DeliveryMode::NON_PERSISTENT); - auto_ptr<cms::TextMessage> txtMessage( session->createTextMessage( "TEST MESSAGE" ) ); + auto_ptr<cms::TextMessage> txtMessage(session->createTextMessage("TEST MESSAGE")); // Send some text messages - producer->send( txtMessage.get() ); + producer->send(txtMessage.get()); - auto_ptr<cms::Message> message( consumer->receive( 1000 ) ); - CPPUNIT_ASSERT( message.get() != NULL ); + auto_ptr<cms::Message> message(consumer->receive(1000)); + CPPUNIT_ASSERT(message.get() != NULL); } //////////////////////////////////////////////////////////////////////////////// void OpenwireSimpleTest::testWithZeroConsumerPrefetchAndNoMessage() { - cmsProvider->setTopic( false ); - cmsProvider->setDestinationName( - UUID::randomUUID().toString() + "?consumer.prefetchSize=0" ); + cmsProvider->setTopic(false); + cmsProvider->setDestinationName(UUID::randomUUID().toString() + "?consumer.prefetchSize=0"); cmsProvider->reconnectSession(); // Create CMS Object for Comms - cms::Session* session( cmsProvider->getSession() ); + cms::Session* session(cmsProvider->getSession()); cms::MessageConsumer* consumer = cmsProvider->getConsumer(); // Should be no message and no exceptions - auto_ptr<cms::Message> message( consumer->receiveNoWait() ); - CPPUNIT_ASSERT( message.get() == NULL ); + auto_ptr<cms::Message> message(consumer->receiveNoWait()); + CPPUNIT_ASSERT(message.get() == NULL); // Should be no message and no exceptions - message.reset( consumer->receive(1000) ); - CPPUNIT_ASSERT( message.get() == NULL ); + message.reset(consumer->receive(1000)); + CPPUNIT_ASSERT(message.get() == NULL); consumer->close(); session->close(); @@ -119,23 +118,23 @@ void OpenwireSimpleTest::testWithZeroConsumerPrefetchAndNoMessage() { //////////////////////////////////////////////////////////////////////////////// void OpenwireSimpleTest::testWithZeroConsumerPrefetchAndNoMessage2() { - cmsProvider->setTopic( false ); + cmsProvider->setTopic(false); ActiveMQConnection* amqConnection = dynamic_cast<ActiveMQConnection*>(cmsProvider->getConnection()); amqConnection->getPrefetchPolicy()->setQueuePrefetch(0); amqConnection->getPrefetchPolicy()->setTopicPrefetch(0); cmsProvider->reconnectSession(); // Create CMS Object for Comms - cms::Session* session( cmsProvider->getSession() ); + cms::Session* session(cmsProvider->getSession()); cms::MessageConsumer* consumer = cmsProvider->getConsumer(); // Should be no message and no exceptions - auto_ptr<cms::Message> message( consumer->receiveNoWait() ); - CPPUNIT_ASSERT( message.get() == NULL ); + auto_ptr<cms::Message> message(consumer->receiveNoWait()); + CPPUNIT_ASSERT(message.get() == NULL); // Should be no message and no exceptions - message.reset( consumer->receive(1000) ); - CPPUNIT_ASSERT( message.get() == NULL ); + message.reset(consumer->receive(1000)); + CPPUNIT_ASSERT(message.get() == NULL); consumer->close(); session->close(); @@ -144,17 +143,16 @@ void OpenwireSimpleTest::testWithZeroConsumerPrefetchAndNoMessage2() { //////////////////////////////////////////////////////////////////////////////// void OpenwireSimpleTest::testMapMessageSendToQueue() { - cmsProvider->setTopic( false ); - cmsProvider->setDestinationName( - UUID::randomUUID().toString() + "?consumer.prefetchSize=0" ); + cmsProvider->setTopic(false); + cmsProvider->setDestinationName(UUID::randomUUID().toString() + "?consumer.prefetchSize=0"); cmsProvider->reconnectSession(); // Create CMS Object for Comms - cms::Session* session( cmsProvider->getSession() ); + cms::Session* session(cmsProvider->getSession()); cms::MessageConsumer* consumer = cmsProvider->getConsumer(); cms::MessageProducer* producer = cmsProvider->getProducer(); - producer->setDeliveryMode( DeliveryMode::NON_PERSISTENT ); + producer->setDeliveryMode(DeliveryMode::NON_PERSISTENT); unsigned char byteValue = 'A'; char charValue = 'B'; @@ -166,54 +164,54 @@ void OpenwireSimpleTest::testMapMessageSendToQueue() { double doubleValue = 654564.654654; std::string stringValue = "The test string"; - auto_ptr<cms::MapMessage> mapMessage( session->createMapMessage() ); + auto_ptr<cms::MapMessage> mapMessage(session->createMapMessage()); - mapMessage->setString( "stringKey", stringValue ); - mapMessage->setBoolean( "boolKey", booleanValue ); - mapMessage->setByte( "byteKey", byteValue ); - mapMessage->setChar( "charKey", charValue ); - mapMessage->setShort( "shortKey", shortValue ); - mapMessage->setInt( "intKey", intValue ); - mapMessage->setLong( "longKey", longValue ); - mapMessage->setFloat( "floatKey", floatValue ); - mapMessage->setDouble( "doubleKey", doubleValue ); + mapMessage->setString("stringKey", stringValue); + mapMessage->setBoolean("boolKey", booleanValue); + mapMessage->setByte("byteKey", byteValue); + mapMessage->setChar("charKey", charValue); + mapMessage->setShort("shortKey", shortValue); + mapMessage->setInt("intKey", intValue); + mapMessage->setLong("longKey", longValue); + mapMessage->setFloat("floatKey", floatValue); + mapMessage->setDouble("doubleKey", doubleValue); std::vector<unsigned char> bytes; - bytes.push_back( 65 ); - bytes.push_back( 66 ); - bytes.push_back( 67 ); - bytes.push_back( 68 ); - bytes.push_back( 69 ); - mapMessage->setBytes( "bytesKey", bytes ); + bytes.push_back(65); + bytes.push_back(66); + bytes.push_back(67); + bytes.push_back(68); + bytes.push_back(69); + mapMessage->setBytes("bytesKey", bytes); // Send some text messages - producer->send( mapMessage.get() ); - - auto_ptr<cms::Message> message( consumer->receive( 2000 ) ); - CPPUNIT_ASSERT( message.get() != NULL ); - - cms::MapMessage* recvMapMessage = dynamic_cast<MapMessage*>( message.get() ); - CPPUNIT_ASSERT( recvMapMessage != NULL ); - CPPUNIT_ASSERT( recvMapMessage->getString( "stringKey" ) == stringValue ); - CPPUNIT_ASSERT( recvMapMessage->getBoolean( "boolKey" ) == booleanValue ); - CPPUNIT_ASSERT( recvMapMessage->getByte( "byteKey" ) == byteValue ); - CPPUNIT_ASSERT( recvMapMessage->getChar( "charKey" ) == charValue ); - CPPUNIT_ASSERT( recvMapMessage->getShort( "shortKey" ) == shortValue ); - CPPUNIT_ASSERT( recvMapMessage->getInt( "intKey" ) == intValue ); - CPPUNIT_ASSERT( recvMapMessage->getLong( "longKey" ) == longValue ); - CPPUNIT_ASSERT( recvMapMessage->getFloat( "floatKey" ) == floatValue ); - CPPUNIT_ASSERT( recvMapMessage->getDouble( "doubleKey" ) == doubleValue ); - CPPUNIT_ASSERT( recvMapMessage->getBytes( "bytesKey" ) == bytes ); + producer->send(mapMessage.get()); + + auto_ptr<cms::Message> message(consumer->receive(2000)); + CPPUNIT_ASSERT(message.get() != NULL); + + cms::MapMessage* recvMapMessage = dynamic_cast<MapMessage*>(message.get()); + CPPUNIT_ASSERT(recvMapMessage != NULL); + CPPUNIT_ASSERT(recvMapMessage->getString("stringKey") == stringValue); + CPPUNIT_ASSERT(recvMapMessage->getBoolean("boolKey") == booleanValue); + CPPUNIT_ASSERT(recvMapMessage->getByte("byteKey") == byteValue); + CPPUNIT_ASSERT(recvMapMessage->getChar("charKey") == charValue); + CPPUNIT_ASSERT(recvMapMessage->getShort("shortKey") == shortValue); + CPPUNIT_ASSERT(recvMapMessage->getInt("intKey") == intValue); + CPPUNIT_ASSERT(recvMapMessage->getLong("longKey") == longValue); + CPPUNIT_ASSERT(recvMapMessage->getFloat("floatKey") == floatValue); + CPPUNIT_ASSERT(recvMapMessage->getDouble("doubleKey") == doubleValue); + CPPUNIT_ASSERT(recvMapMessage->getBytes("bytesKey") == bytes); } //////////////////////////////////////////////////////////////////////////////// void OpenwireSimpleTest::testMapMessageSendToTopic() { // Create CMS Object for Comms - cms::Session* session( cmsProvider->getSession() ); + cms::Session* session(cmsProvider->getSession()); cms::MessageConsumer* consumer = cmsProvider->getConsumer(); cms::MessageProducer* producer = cmsProvider->getProducer(); - producer->setDeliveryMode( DeliveryMode::NON_PERSISTENT ); + producer->setDeliveryMode(DeliveryMode::NON_PERSISTENT); unsigned char byteValue = 'A'; char charValue = 'B'; @@ -225,44 +223,44 @@ void OpenwireSimpleTest::testMapMessageSendToTopic() { double doubleValue = 654564.654654; std::string stringValue = "The test string"; - auto_ptr<cms::MapMessage> mapMessage( session->createMapMessage() ); + auto_ptr<cms::MapMessage> mapMessage(session->createMapMessage()); - mapMessage->setString( "stringKey", stringValue ); - mapMessage->setBoolean( "boolKey", booleanValue ); - mapMessage->setByte( "byteKey", byteValue ); - mapMessage->setChar( "charKey", charValue ); - mapMessage->setShort( "shortKey", shortValue ); - mapMessage->setInt( "intKey", intValue ); - mapMessage->setLong( "longKey", longValue ); - mapMessage->setFloat( "floatKey", floatValue ); - mapMessage->setDouble( "doubleKey", doubleValue ); + mapMessage->setString("stringKey", stringValue); + mapMessage->setBoolean("boolKey", booleanValue); + mapMessage->setByte("byteKey", byteValue); + mapMessage->setChar("charKey", charValue); + mapMessage->setShort("shortKey", shortValue); + mapMessage->setInt("intKey", intValue); + mapMessage->setLong("longKey", longValue); + mapMessage->setFloat("floatKey", floatValue); + mapMessage->setDouble("doubleKey", doubleValue); std::vector<unsigned char> bytes; - bytes.push_back( 65 ); - bytes.push_back( 66 ); - bytes.push_back( 67 ); - bytes.push_back( 68 ); - bytes.push_back( 69 ); - mapMessage->setBytes( "bytesKey", bytes ); + bytes.push_back(65); + bytes.push_back(66); + bytes.push_back(67); + bytes.push_back(68); + bytes.push_back(69); + mapMessage->setBytes("bytesKey", bytes); // Send some text messages - producer->send( mapMessage.get() ); - - auto_ptr<cms::Message> message( consumer->receive( 2000 ) ); - CPPUNIT_ASSERT( message.get() != NULL ); - - cms::MapMessage* recvMapMessage = dynamic_cast<MapMessage*>( message.get() ); - CPPUNIT_ASSERT( recvMapMessage != NULL ); - CPPUNIT_ASSERT( recvMapMessage->getString( "stringKey" ) == stringValue ); - CPPUNIT_ASSERT( recvMapMessage->getBoolean( "boolKey" ) == booleanValue ); - CPPUNIT_ASSERT( recvMapMessage->getByte( "byteKey" ) == byteValue ); - CPPUNIT_ASSERT( recvMapMessage->getChar( "charKey" ) == charValue ); - CPPUNIT_ASSERT( recvMapMessage->getShort( "shortKey" ) == shortValue ); - CPPUNIT_ASSERT( recvMapMessage->getInt( "intKey" ) == intValue ); - CPPUNIT_ASSERT( recvMapMessage->getLong( "longKey" ) == longValue ); - CPPUNIT_ASSERT( recvMapMessage->getFloat( "floatKey" ) == floatValue ); - CPPUNIT_ASSERT( recvMapMessage->getDouble( "doubleKey" ) == doubleValue ); - CPPUNIT_ASSERT( recvMapMessage->getBytes( "bytesKey" ) == bytes ); + producer->send(mapMessage.get()); + + auto_ptr<cms::Message> message(consumer->receive(2000)); + CPPUNIT_ASSERT(message.get() != NULL); + + cms::MapMessage* recvMapMessage = dynamic_cast<MapMessage*>(message.get()); + CPPUNIT_ASSERT(recvMapMessage != NULL); + CPPUNIT_ASSERT(recvMapMessage->getString("stringKey") == stringValue); + CPPUNIT_ASSERT(recvMapMessage->getBoolean("boolKey") == booleanValue); + CPPUNIT_ASSERT(recvMapMessage->getByte("byteKey") == byteValue); + CPPUNIT_ASSERT(recvMapMessage->getChar("charKey") == charValue); + CPPUNIT_ASSERT(recvMapMessage->getShort("shortKey") == shortValue); + CPPUNIT_ASSERT(recvMapMessage->getInt("intKey") == intValue); + CPPUNIT_ASSERT(recvMapMessage->getLong("longKey") == longValue); + CPPUNIT_ASSERT(recvMapMessage->getFloat("floatKey") == floatValue); + CPPUNIT_ASSERT(recvMapMessage->getDouble("doubleKey") == doubleValue); + CPPUNIT_ASSERT(recvMapMessage->getBytes("bytesKey") == bytes); } //////////////////////////////////////////////////////////////////////////////// @@ -270,41 +268,40 @@ void OpenwireSimpleTest::testDestroyDestination() { try { - cmsProvider->setDestinationName( "testDestroyDestination" ); + cmsProvider->setDestinationName("testDestroyDestination"); cmsProvider->reconnectSession(); // Create CMS Object for Comms - cms::Session* session( cmsProvider->getSession() ); + cms::Session* session(cmsProvider->getSession()); cms::MessageConsumer* consumer = cmsProvider->getConsumer(); cms::MessageProducer* producer = cmsProvider->getProducer(); - producer->setDeliveryMode( DeliveryMode::NON_PERSISTENT ); + producer->setDeliveryMode(DeliveryMode::NON_PERSISTENT); - auto_ptr<cms::TextMessage> txtMessage( session->createTextMessage( "TEST MESSAGE" ) ); + auto_ptr<cms::TextMessage> txtMessage(session->createTextMessage("TEST MESSAGE")); // Send some text messages - producer->send( txtMessage.get() ); + producer->send(txtMessage.get()); - auto_ptr<cms::Message> message( consumer->receive( 1000 ) ); - CPPUNIT_ASSERT( message.get() != NULL ); + auto_ptr<cms::Message> message(consumer->receive(1000)); + CPPUNIT_ASSERT(message.get() != NULL); - ActiveMQConnection* connection = - dynamic_cast<ActiveMQConnection*>( cmsProvider->getConnection() ); + ActiveMQConnection* connection = dynamic_cast<ActiveMQConnection*>(cmsProvider->getConnection()); - CPPUNIT_ASSERT( connection != NULL ); + CPPUNIT_ASSERT(connection != NULL); - try{ - connection->destroyDestination( cmsProvider->getDestination() ); - CPPUNIT_ASSERT_MESSAGE( "Destination Should be in use.", false ); - } catch( ActiveMQException& ex ) { + try { + connection->destroyDestination(cmsProvider->getDestination()); + CPPUNIT_ASSERT_MESSAGE("Destination Should be in use.", false); + } catch (ActiveMQException& ex) { } cmsProvider->reconnectSession(); - connection->destroyDestination( cmsProvider->getDestination() ); + connection->destroyDestination(cmsProvider->getDestination()); - } catch( ActiveMQException& ex ) { + } catch (ActiveMQException& ex) { ex.printStackTrace(); - CPPUNIT_ASSERT_MESSAGE( "CAUGHT EXCEPTION", false ); + CPPUNIT_ASSERT_MESSAGE("CAUGHT EXCEPTION", false); } } @@ -312,10 +309,10 @@ void OpenwireSimpleTest::testDestroyDestination() { void OpenwireSimpleTest::tesstStreamMessage() { // Create CMS Object for Comms - cms::Session* session( cmsProvider->getSession() ); + cms::Session* session(cmsProvider->getSession()); cms::MessageConsumer* consumer = cmsProvider->getConsumer(); cms::MessageProducer* producer = cmsProvider->getProducer(); - producer->setDeliveryMode( DeliveryMode::NON_PERSISTENT ); + producer->setDeliveryMode(DeliveryMode::NON_PERSISTENT); unsigned char byteValue = 'A'; char charValue = 'B'; @@ -327,45 +324,45 @@ void OpenwireSimpleTest::tesstStreamMessage() { double doubleValue = 654564.654654; std::string stringValue = "The test string"; - auto_ptr<cms::StreamMessage> streamMessage( session->createStreamMessage() ); + auto_ptr<cms::StreamMessage> streamMessage(session->createStreamMessage()); - streamMessage->writeString( stringValue ); - streamMessage->writeBoolean( booleanValue ); - streamMessage->writeByte( byteValue ); - streamMessage->writeChar( charValue ); - streamMessage->writeShort( shortValue ); - streamMessage->writeInt( intValue ); - streamMessage->writeLong( longValue ); - streamMessage->writeFloat( floatValue ); - streamMessage->writeDouble( doubleValue ); + streamMessage->writeString(stringValue); + streamMessage->writeBoolean(booleanValue); + streamMessage->writeByte(byteValue); + streamMessage->writeChar(charValue); + streamMessage->writeShort(shortValue); + streamMessage->writeInt(intValue); + streamMessage->writeLong(longValue); + streamMessage->writeFloat(floatValue); + streamMessage->writeDouble(doubleValue); std::vector<unsigned char> bytes; - std::vector<unsigned char> readBytes( 100 ); - bytes.push_back( 65 ); - bytes.push_back( 66 ); - bytes.push_back( 67 ); - bytes.push_back( 68 ); - bytes.push_back( 69 ); - streamMessage->writeBytes( bytes ); + std::vector<unsigned char> readBytes(100); + bytes.push_back(65); + bytes.push_back(66); + bytes.push_back(67); + bytes.push_back(68); + bytes.push_back(69); + streamMessage->writeBytes(bytes); // Send some text messages - producer->send( streamMessage.get() ); - - auto_ptr<cms::Message> message( consumer->receive( 2000 ) ); - CPPUNIT_ASSERT( message.get() != NULL ); - - cms::StreamMessage* rcvStreamMessage = dynamic_cast<StreamMessage*>( message.get() ); - CPPUNIT_ASSERT( rcvStreamMessage != NULL ); - CPPUNIT_ASSERT( rcvStreamMessage->readString() == stringValue ); - CPPUNIT_ASSERT( rcvStreamMessage->readBoolean() == booleanValue ); - CPPUNIT_ASSERT( rcvStreamMessage->readByte() == byteValue ); - CPPUNIT_ASSERT( rcvStreamMessage->readChar() == charValue ); - CPPUNIT_ASSERT( rcvStreamMessage->readShort() == shortValue ); - CPPUNIT_ASSERT( rcvStreamMessage->readInt() == intValue ); - CPPUNIT_ASSERT( rcvStreamMessage->readLong() == longValue ); - CPPUNIT_ASSERT( rcvStreamMessage->readFloat() == floatValue ); - CPPUNIT_ASSERT( rcvStreamMessage->readDouble() == doubleValue ); - CPPUNIT_ASSERT( rcvStreamMessage->readBytes( readBytes ) == (int)bytes.size() ); + producer->send(streamMessage.get()); + + auto_ptr<cms::Message> message(consumer->receive(2000)); + CPPUNIT_ASSERT(message.get() != NULL); + + cms::StreamMessage* rcvStreamMessage = dynamic_cast<StreamMessage*>(message.get()); + CPPUNIT_ASSERT(rcvStreamMessage != NULL); + CPPUNIT_ASSERT(rcvStreamMessage->readString() == stringValue); + CPPUNIT_ASSERT(rcvStreamMessage->readBoolean() == booleanValue); + CPPUNIT_ASSERT(rcvStreamMessage->readByte() == byteValue); + CPPUNIT_ASSERT(rcvStreamMessage->readChar() == charValue); + CPPUNIT_ASSERT(rcvStreamMessage->readShort() == shortValue); + CPPUNIT_ASSERT(rcvStreamMessage->readInt() == intValue); + CPPUNIT_ASSERT(rcvStreamMessage->readLong() == longValue); + CPPUNIT_ASSERT(rcvStreamMessage->readFloat() == floatValue); + CPPUNIT_ASSERT(rcvStreamMessage->readDouble() == doubleValue); + CPPUNIT_ASSERT(rcvStreamMessage->readBytes(readBytes) == (int )bytes.size()); } //////////////////////////////////////////////////////////////////////////////// @@ -392,16 +389,69 @@ void OpenwireSimpleTest::testReceiveWithSessionSyncDispatch() { cmsProvider->reconnectSession(); // Create CMS Object for Comms - cms::Session* session( cmsProvider->getSession() ); + cms::Session* session(cmsProvider->getSession()); cms::MessageConsumer* consumer = cmsProvider->getConsumer(); cms::MessageProducer* producer = cmsProvider->getProducer(); - producer->setDeliveryMode( DeliveryMode::NON_PERSISTENT ); + producer->setDeliveryMode(DeliveryMode::NON_PERSISTENT); - auto_ptr<cms::TextMessage> txtMessage( session->createTextMessage( "TEST MESSAGE" ) ); + auto_ptr<cms::TextMessage> txtMessage(session->createTextMessage("TEST MESSAGE")); // Send some text messages - producer->send( txtMessage.get() ); + producer->send(txtMessage.get()); + + auto_ptr<cms::Message> message(consumer->receive(1000)); + CPPUNIT_ASSERT(message.get() != NULL); +} + +//////////////////////////////////////////////////////////////////////////////// +void OpenwireSimpleTest::testWithZeroConsumerPrefetchAndZeroRedelivery() { + + ActiveMQConnectionFactory factory(getBrokerURL()); + auto_ptr<cms::Connection> connection(factory.createConnection()); + + connection->start(); + + { + auto_ptr<cms::Session> session(connection->createSession(cms::Session::AUTO_ACKNOWLEDGE)); + auto_ptr<cms::Queue> queue(session->createQueue("testWithZeroConsumerPrefetchAndZeroRedelivery")); + auto_ptr<cms::MessageProducer> producer(session->createProducer(queue.get())); + + auto_ptr<cms::Message> message(session->createTextMessage("Hello")); + producer->send(message.get()); + producer->close(); + session->close(); + } + + { + auto_ptr<cms::Session> session(connection->createSession(cms::Session::SESSION_TRANSACTED)); + auto_ptr<cms::Queue> queue(session->createQueue("testWithZeroConsumerPrefetchAndZeroRedelivery")); + auto_ptr<cms::MessageConsumer> consumer(session->createConsumer(queue.get())); + + auto_ptr<cms::Message> message(consumer->receive(5000)); + CPPUNIT_ASSERT(message.get() != NULL); + + session->rollback(); + session->close(); + connection->close(); + } + + connection.reset(factory.createConnection()); + connection->start(); + ActiveMQConnection* amqConnection = dynamic_cast<ActiveMQConnection*>(connection.get()); + + // Now we test the zero prefetc + zero max redelivery case. + amqConnection->getRedeliveryPolicy()->setMaximumRedeliveries(0); + amqConnection->getPrefetchPolicy()->setQueuePrefetch(0); + + auto_ptr<cms::Session> session(connection->createSession(cms::Session::SESSION_TRANSACTED)); + auto_ptr<cms::Queue> queue(session->createQueue("testWithZeroConsumerPrefetchAndZeroRedelivery")); + auto_ptr<cms::MessageConsumer> consumer(session->createConsumer(queue.get())); + + auto_ptr<cms::Message> message(consumer->receive(5000)); + CPPUNIT_ASSERT(message.get() == NULL); + + session->commit(); + session->close(); - auto_ptr<cms::Message> message( consumer->receive( 1000 ) ); - CPPUNIT_ASSERT( message.get() != NULL ); + amqConnection->destroyDestination(queue.get()); } http://git-wip-us.apache.org/repos/asf/activemq-cpp/blob/78172b2a/activemq-cpp/src/test-integration/activemq/test/openwire/OpenwireSimpleTest.h ---------------------------------------------------------------------- diff --git a/activemq-cpp/src/test-integration/activemq/test/openwire/OpenwireSimpleTest.h b/activemq-cpp/src/test-integration/activemq/test/openwire/OpenwireSimpleTest.h index 1d399a1..d4d623f 100644 --- a/activemq-cpp/src/test-integration/activemq/test/openwire/OpenwireSimpleTest.h +++ b/activemq-cpp/src/test-integration/activemq/test/openwire/OpenwireSimpleTest.h @@ -43,6 +43,7 @@ namespace openwire{ CPPUNIT_TEST( testWithZeroConsumerPrefetchAndNoMessage ); CPPUNIT_TEST( testWithZeroConsumerPrefetch2 ); CPPUNIT_TEST( testWithZeroConsumerPrefetchAndNoMessage2 ); + CPPUNIT_TEST( testWithZeroConsumerPrefetchAndZeroRedelivery ); CPPUNIT_TEST( testMapMessageSendToQueue ); CPPUNIT_TEST( testMapMessageSendToTopic ); CPPUNIT_TEST( testDestroyDestination ); @@ -66,6 +67,7 @@ namespace openwire{ void testWithZeroConsumerPrefetchAndNoMessage(); void testWithZeroConsumerPrefetch2(); void testWithZeroConsumerPrefetchAndNoMessage2(); + void testWithZeroConsumerPrefetchAndZeroRedelivery(); void testMapMessageSendToQueue(); void testMapMessageSendToTopic(); void tesstStreamMessage();
