Author: tabish
Date: Fri Apr 5 21:27:27 2013
New Revision: 1465136
URL: http://svn.apache.org/r1465136
Log:
https://issues.apache.org/jira/browse/AMQCPP-473
https://issues.apache.org/jira/browse/AMQCPP-472
https://issues.apache.org/jira/browse/AMQCPP-471
Added:
activemq/activemq-cpp/trunk/activemq-cpp/src/test-integration/activemq/test/openwire/OpenwireNonBlockingRedeliveryTest.cpp
(with props)
activemq/activemq-cpp/trunk/activemq-cpp/src/test-integration/activemq/test/openwire/OpenwireNonBlockingRedeliveryTest.h
(with props)
Modified:
activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQConnectionFactory.cpp
activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/kernels/ActiveMQConsumerKernel.cpp
activemq/activemq-cpp/trunk/activemq-cpp/src/test-integration/Makefile.am
activemq/activemq-cpp/trunk/activemq-cpp/src/test-integration/TestRegistry.cpp
Modified:
activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQConnectionFactory.cpp
URL:
http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQConnectionFactory.cpp?rev=1465136&r1=1465135&r2=1465136&view=diff
==============================================================================
---
activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQConnectionFactory.cpp
(original)
+++
activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQConnectionFactory.cpp
Fri Apr 5 21:27:27 2013
@@ -212,8 +212,8 @@ namespace core{
properties->getProperty("connection.optimizedAckScheduledAckInterval",
Long::toString(optimizedAckScheduledAckInterval)));
this->consumerFailoverRedeliveryWaitPeriod = Long::parseLong(
properties->getProperty("connection.consumerFailoverRedeliveryWaitPeriod",
Long::toString(consumerFailoverRedeliveryWaitPeriod)));
- this->nonBlockingRedelivery = Long::parseLong(
- properties->getProperty("connection.nonBlockingRedelivery",
Long::toString(nonBlockingRedelivery)));
+ this->nonBlockingRedelivery = Boolean::parseBoolean(
+ properties->getProperty("connection.nonBlockingRedelivery",
Boolean::toString(nonBlockingRedelivery)));
this->defaultPrefetchPolicy->configure(*properties);
this->defaultRedeliveryPolicy->configure(*properties);
@@ -407,6 +407,7 @@ void ActiveMQConnectionFactory::configur
connection->setExclusiveConsumer(this->settings->exclusiveConsumer);
connection->setTransactedIndividualAck(this->settings->transactedIndividualAck);
connection->setUseRetroactiveConsumer(this->settings->useRetroactiveConsumer);
+
connection->setNonBlockingRedelivery(this->settings->nonBlockingRedelivery);
connection->setConsumerFailoverRedeliveryWaitPeriod(this->settings->consumerFailoverRedeliveryWaitPeriod);
if (this->settings->defaultListener) {
Modified:
activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/kernels/ActiveMQConsumerKernel.cpp
URL:
http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/kernels/ActiveMQConsumerKernel.cpp?rev=1465136&r1=1465135&r2=1465136&view=diff
==============================================================================
---
activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/kernels/ActiveMQConsumerKernel.cpp
(original)
+++
activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/kernels/ActiveMQConsumerKernel.cpp
Fri Apr 5 21:27:27 2013
@@ -524,6 +524,8 @@ namespace {
Exception wrapper(ex.clone());
this->session->getConnection()->onAsyncException(wrapper);
}
+
+ this->consumer.reset(NULL);
}
};
@@ -560,7 +562,7 @@ namespace {
class OptimizedAckTask : public Runnable {
private:
- ActiveMQConsumerKernel* consumer;
+ Pointer<ActiveMQConsumerKernel> consumer;
ActiveMQConsumerKernelConfig* impl;
private:
@@ -570,7 +572,7 @@ namespace {
public:
- OptimizedAckTask(ActiveMQConsumerKernel* consumer,
ActiveMQConsumerKernelConfig* impl) :
+ OptimizedAckTask(Pointer<ActiveMQConsumerKernel> consumer,
ActiveMQConsumerKernelConfig* impl) :
Runnable(), consumer(consumer), impl(impl) {}
virtual ~OptimizedAckTask() {}
@@ -579,8 +581,11 @@ namespace {
if (impl->optimizeAcknowledge &&
!impl->unconsumedMessages->isClosed()) {
this->consumer->deliverAcks();
}
+
} catch(Exception& ex) {
+ impl->session->getConnection()->onAsyncException(ex);
}
+ this->consumer.reset(NULL);
}
};
@@ -618,6 +623,8 @@ namespace {
} catch (Exception& e) {
session->getConnection()->onAsyncException(e);
}
+
+ this->consumer.reset(NULL);
}
};
}
@@ -1928,7 +1935,9 @@ void ActiveMQConsumerKernel::setOptimize
// Should we periodically send out all outstanding acks.
if (this->internal->optimizeAcknowledge &&
this->internal->optimizedAckScheduledAckInterval > 0) {
- this->internal->optimizedAckTask = new OptimizedAckTask(this,
this->internal);
+ Pointer<ActiveMQConsumerKernel> self =
+
this->session->lookupConsumerKernel(this->consumerInfo->getConsumerId());
+ this->internal->optimizedAckTask = new OptimizedAckTask(self,
this->internal);
try {
this->session->getScheduler()->executePeriodically(
Modified:
activemq/activemq-cpp/trunk/activemq-cpp/src/test-integration/Makefile.am
URL:
http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/test-integration/Makefile.am?rev=1465136&r1=1465135&r2=1465136&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/test-integration/Makefile.am
(original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/test-integration/Makefile.am
Fri Apr 5 21:27:27 2013
@@ -47,6 +47,7 @@ cc_sources = \
activemq/test/openwire/OpenwireMapMessageTest.cpp \
activemq/test/openwire/OpenwireMessageCompressionTest.cpp \
activemq/test/openwire/OpenwireMessagePriorityTest.cpp \
+ activemq/test/openwire/OpenwireNonBlockingRedeliveryTest.cpp \
activemq/test/openwire/OpenwireOptimizedAckTest.cpp \
activemq/test/openwire/OpenwireQueueBrowserTest.cpp \
activemq/test/openwire/OpenwireSimpleRollbackTest.cpp \
@@ -105,6 +106,7 @@ h_sources = \
activemq/test/openwire/OpenwireMapMessageTest.h \
activemq/test/openwire/OpenwireMessageCompressionTest.h \
activemq/test/openwire/OpenwireMessagePriorityTest.h \
+ activemq/test/openwire/OpenwireNonBlockingRedeliveryTest.h \
activemq/test/openwire/OpenwireOptimizedAckTest.h \
activemq/test/openwire/OpenwireQueueBrowserTest.h \
activemq/test/openwire/OpenwireSimpleRollbackTest.h \
Modified:
activemq/activemq-cpp/trunk/activemq-cpp/src/test-integration/TestRegistry.cpp
URL:
http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/test-integration/TestRegistry.cpp?rev=1465136&r1=1465135&r2=1465136&view=diff
==============================================================================
---
activemq/activemq-cpp/trunk/activemq-cpp/src/test-integration/TestRegistry.cpp
(original)
+++
activemq/activemq-cpp/trunk/activemq-cpp/src/test-integration/TestRegistry.cpp
Fri Apr 5 21:27:27 2013
@@ -29,6 +29,7 @@
#include "activemq/test/openwire/OpenwireMessageCompressionTest.h"
#include "activemq/test/openwire/OpenwireMessagePriorityTest.h"
#include "activemq/test/openwire/OpenwireMapMessageTest.h"
+#include "activemq/test/openwire/OpenwireNonBlockingRedeliveryTest.h"
#include "activemq/test/openwire/OpenwireOptimizedAckTest.h"
#include "activemq/test/openwire/OpenwireQueueBrowserTest.h"
#include "activemq/test/openwire/OpenwireSimpleRollbackTest.h"
@@ -65,6 +66,7 @@ CPPUNIT_TEST_SUITE_REGISTRATION( activem
CPPUNIT_TEST_SUITE_REGISTRATION(
activemq::test::openwire::OpenwireMessageCompressionTest );
CPPUNIT_TEST_SUITE_REGISTRATION(
activemq::test::openwire::OpenwireMessagePriorityTest );
CPPUNIT_TEST_SUITE_REGISTRATION(
activemq::test::openwire::OpenwireMapMessageTest );
+CPPUNIT_TEST_SUITE_REGISTRATION(
activemq::test::openwire::OpenwireNonBlockingRedeliveryTest );
CPPUNIT_TEST_SUITE_REGISTRATION(
activemq::test::openwire::OpenwireOptimizedAckTest );
CPPUNIT_TEST_SUITE_REGISTRATION(
activemq::test::openwire::OpenwireQueueBrowserTest );
CPPUNIT_TEST_SUITE_REGISTRATION(
activemq::test::openwire::OpenwireSimpleRollbackTest );
Added:
activemq/activemq-cpp/trunk/activemq-cpp/src/test-integration/activemq/test/openwire/OpenwireNonBlockingRedeliveryTest.cpp
URL:
http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/test-integration/activemq/test/openwire/OpenwireNonBlockingRedeliveryTest.cpp?rev=1465136&view=auto
==============================================================================
---
activemq/activemq-cpp/trunk/activemq-cpp/src/test-integration/activemq/test/openwire/OpenwireNonBlockingRedeliveryTest.cpp
(added)
+++
activemq/activemq-cpp/trunk/activemq-cpp/src/test-integration/activemq/test/openwire/OpenwireNonBlockingRedeliveryTest.cpp
Fri Apr 5 21:27:27 2013
@@ -0,0 +1,262 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "OpenwireNonBlockingRedeliveryTest.h"
+
+#include <cms/MessageListener.h>
+#include <cms/ExceptionListener.h>
+
+#include <activemq/commands/Message.h>
+#include <activemq/commands/ProducerId.h>
+#include <activemq/commands/MessageId.h>
+#include <activemq/core/ActiveMQConnectionFactory.h>
+#include <activemq/core/ActiveMQConnection.h>
+#include <activemq/core/ActiveMQConsumer.h>
+#include <activemq/core/PrefetchPolicy.h>
+#include <activemq/exceptions/ActiveMQException.h>
+
+#include <decaf/lang/Thread.h>
+#include <decaf/lang/Pointer.h>
+#include <decaf/util/LinkedList.h>
+#include <decaf/util/concurrent/atomic/AtomicInteger.h>
+
+using namespace std;
+using namespace cms;
+using namespace activemq;
+using namespace activemq::commands;
+using namespace activemq::core;
+using namespace activemq::test;
+using namespace activemq::test::openwire;
+using namespace activemq::util;
+using namespace activemq::exceptions;
+using namespace decaf;
+using namespace decaf::lang;
+using namespace decaf::util;
+using namespace decaf::util::concurrent;
+using namespace decaf::util::concurrent::atomic;
+
+////////////////////////////////////////////////////////////////////////////////
+namespace {
+
+ class TestProducer : public Thread {
+ private:
+
+ std::string destinationName;
+ std::string brokerUri;
+ int produceMessages;
+
+ public:
+
+ TestProducer(const std::string& brokerUri,
+ const std::string& destinationName,
+ int produceMessages) : Thread(),
+ destinationName(destinationName),
+ brokerUri(brokerUri),
+ produceMessages(produceMessages) {
+ }
+
+ void run() {
+
+ Pointer<ActiveMQConnectionFactory> connectionFactory;
+ Pointer<Connection> connection;
+ Pointer<Session> session;
+ Pointer<Destination> destination;
+
+ try {
+
+ connectionFactory.reset(new
ActiveMQConnectionFactory(brokerUri));
+ connection.reset(connectionFactory->createConnection());
+ connection->start();
+
session.reset(connection->createSession(Session::AUTO_ACKNOWLEDGE));
+
+ destination.reset(session->createQueue(destinationName));
+
+ // Create a MessageProducer from the Session to the Topic or
Queue
+ Pointer<MessageProducer>
producer(session->createProducer(destination.get()));
+ producer->setDeliveryMode(cms::DeliveryMode::NON_PERSISTENT);
+
+ for (int i = 0; i < produceMessages; i++) {
+ Pointer<TextMessage> message(session->createTextMessage());
+ message->setLongProperty("TestTime",
System::currentTimeMillis());
+ try {
+ producer->send(message.get());
+ } catch (Exception& deeperException) {
+ }
+
+ Thread::sleep(50);
+ }
+ } catch (Exception& e) {
+ }
+
+ try {
+ if (connection != NULL) {
+ connection->close();
+ }
+ } catch (Exception& e) {
+ }
+ }
+ };
+
+ class TestConsumer : public Thread, public MessageListener {
+ private:
+
+ std::string brokerUri;
+ std::string destinationName;
+ CountDownLatch totalMessages;
+ int expected;
+ int receivedCount;
+ bool rolledBack;
+ bool failed;
+ LinkedList<int>* messages;
+ Pointer<ActiveMQConnectionFactory> connectionFactory;
+ Pointer<Connection> connection;
+ Pointer<Session> session;
+ Pointer<MessageConsumer> consumer;
+
+ public:
+
+ TestConsumer(const std::string& brokerUri,
+ const std::string& destinationName,
+ LinkedList<int>* messages,
+ int totalMessages) : Thread(),
+ brokerUri(brokerUri),
+ destinationName(destinationName),
+ totalMessages(totalMessages),
+ expected(totalMessages),
+ receivedCount(0),
+ rolledBack(false),
+ failed(false),
+ messages(messages),
+ connectionFactory(),
+ connection(),
+ session(),
+ consumer() {
+ }
+
+ bool isFailed() const {
+ return this->failed;
+ }
+
+ virtual void run() {
+ try {
+
+ connectionFactory.reset(new
ActiveMQConnectionFactory(brokerUri));
+ connection.reset(connectionFactory->createConnection());
+
session.reset(connection->createSession(Session::SESSION_TRANSACTED));
+
+ Pointer<ActiveMQConnection> amqCon =
connection.dynamicCast<ActiveMQConnection>();
+
+ RedeliveryPolicy* policy = amqCon->getRedeliveryPolicy();
+ policy->setInitialRedeliveryDelay(1000);
+ policy->setBackOffMultiplier(-1);
+ policy->setRedeliveryDelay(1000);
+ policy->setUseExponentialBackOff(false);
+ policy->setMaximumRedeliveries(10);
+
+ Pointer<Destination>
destination(session->createQueue(destinationName));
+ consumer.reset(session->createConsumer(destination.get()));
+ consumer->setMessageListener(this);
+
+ connection->start();
+
+ if (!totalMessages.await(10, TimeUnit::MINUTES)) {
+ this->failed = true;
+ }
+
+ } catch (Exception& e) {
+ }
+ try {
+ if (connection != NULL) {
+ connection->close();
+ }
+ } catch (Exception& e) {
+ }
+ }
+
+ virtual void onMessage(const cms::Message* message) {
+ receivedCount++;
+
+ try {
+
+ const commands::Message* amqMessage =
+ dynamic_cast<const commands::Message*>(message);
+
+ if (!rolledBack) {
+ if (++receivedCount == expected / 2) {
+ rolledBack = true;
+ session->rollback();
+ }
+ } else {
+ Pointer<MessageId> msgId = amqMessage->getMessageId();
+ messages->add((int)msgId->getProducerSequenceId());
+ session->commit();
+ totalMessages.countDown();
+ }
+
+ } catch (Exception& ex) {
+ this->failed = true;
+ }
+ }
+ };
+}
+
+////////////////////////////////////////////////////////////////////////////////
+OpenwireNonBlockingRedeliveryTest::OpenwireNonBlockingRedeliveryTest() {
+}
+
+////////////////////////////////////////////////////////////////////////////////
+OpenwireNonBlockingRedeliveryTest::~OpenwireNonBlockingRedeliveryTest() {
+}
+
+////////////////////////////////////////////////////////////////////////////////
+std::string OpenwireNonBlockingRedeliveryTest::getBrokerURL() const {
+ return activemq::util::IntegrationCommon::getInstance().getOpenwireURL() +
+ "?connection.nonBlockingRedelivery=true";
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void OpenwireNonBlockingRedeliveryTest::testConsumerMessagesAreNotOrdered() {
+
+ LinkedList<int> messages;
+
+ const std::string DEST_NAME = "QUEUE.FOO";
+
+ TestProducer producer(getBrokerURL(), DEST_NAME, 500);
+ TestConsumer consumer(getBrokerURL(), DEST_NAME, &messages, 500);
+
+ producer.start();
+ consumer.start();
+
+ producer.join();
+ consumer.join();
+
+ CPPUNIT_ASSERT(!consumer.isFailed());
+
+ bool ordered = true;
+ int lastId = 0;
+ Pointer<Iterator<int> > sequenceIds(messages.iterator());
+ while (sequenceIds->hasNext()) {
+ int id = sequenceIds->next();
+ if (id != (lastId + 1)) {
+ ordered = false;
+ }
+
+ lastId = id;
+ }
+
+ CPPUNIT_ASSERT(!ordered);
+}
Propchange:
activemq/activemq-cpp/trunk/activemq-cpp/src/test-integration/activemq/test/openwire/OpenwireNonBlockingRedeliveryTest.cpp
------------------------------------------------------------------------------
svn:eol-style = native
Added:
activemq/activemq-cpp/trunk/activemq-cpp/src/test-integration/activemq/test/openwire/OpenwireNonBlockingRedeliveryTest.h
URL:
http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/test-integration/activemq/test/openwire/OpenwireNonBlockingRedeliveryTest.h?rev=1465136&view=auto
==============================================================================
---
activemq/activemq-cpp/trunk/activemq-cpp/src/test-integration/activemq/test/openwire/OpenwireNonBlockingRedeliveryTest.h
(added)
+++
activemq/activemq-cpp/trunk/activemq-cpp/src/test-integration/activemq/test/openwire/OpenwireNonBlockingRedeliveryTest.h
Fri Apr 5 21:27:27 2013
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef _ACTIVEMQ_TEST_OPENWIRE_OPENWIRENONBLOCKINGREDELIVERYTEST_H_
+#define _ACTIVEMQ_TEST_OPENWIRE_OPENWIRENONBLOCKINGREDELIVERYTEST_H_
+
+#include <activemq/test/MessagePriorityTest.h>
+
+namespace activemq {
+namespace test {
+namespace openwire {
+
+ class OpenwireNonBlockingRedeliveryTest : public MessagePriorityTest {
+
+ CPPUNIT_TEST_SUITE( OpenwireNonBlockingRedeliveryTest );
+ CPPUNIT_TEST( testConsumerMessagesAreNotOrdered );
+ CPPUNIT_TEST_SUITE_END();
+
+ public:
+
+ OpenwireNonBlockingRedeliveryTest();
+ virtual ~OpenwireNonBlockingRedeliveryTest();
+
+ virtual std::string getBrokerURL() const;
+
+ void testConsumerMessagesAreNotOrdered();
+
+ };
+
+}}}
+
+#endif /* _ACTIVEMQ_TEST_OPENWIRE_OPENWIRENONBLOCKINGREDELIVERYTEST_H_ */
Propchange:
activemq/activemq-cpp/trunk/activemq-cpp/src/test-integration/activemq/test/openwire/OpenwireNonBlockingRedeliveryTest.h
------------------------------------------------------------------------------
svn:eol-style = native