Updated Branches: refs/heads/trunk 22b2b7570 -> 66103dd16
fix https://issues.apache.org/jira/browse/AMQCPP-509 Adds support for alwaysSessionAsync option so that threads in Session can be bypassed if needed. Project: http://git-wip-us.apache.org/repos/asf/activemq-cpp/repo Commit: http://git-wip-us.apache.org/repos/asf/activemq-cpp/commit/66103dd1 Tree: http://git-wip-us.apache.org/repos/asf/activemq-cpp/tree/66103dd1 Diff: http://git-wip-us.apache.org/repos/asf/activemq-cpp/diff/66103dd1 Branch: refs/heads/trunk Commit: 66103dd16416e162a329512034fc46f4fcfd271b Parents: 22b2b75 Author: Timothy Bish <[email protected]> Authored: Fri Sep 6 17:39:02 2013 -0400 Committer: Timothy Bish <[email protected]> Committed: Fri Sep 6 17:54:23 2013 -0400 ---------------------------------------------------------------------- .../main/activemq/core/ActiveMQConnection.cpp | 19 +++++++++++++--- .../src/main/activemq/core/ActiveMQConnection.h | 17 +++++++++++++-- .../activemq/core/ActiveMQConnectionFactory.cpp | 15 +++++++++++++ .../activemq/core/ActiveMQConnectionFactory.h | 22 +++++++++++++++++-- .../activemq/core/ActiveMQSessionExecutor.cpp | 13 ++++++++--- .../core/kernels/ActiveMQSessionKernel.cpp | 16 ++++++++++++-- .../core/kernels/ActiveMQSessionKernel.h | 14 ++++++++++++ .../test/openwire/OpenwireSimpleTest.cpp | 23 ++++++++++++++++++++ .../activemq/test/openwire/OpenwireSimpleTest.h | 2 ++ 9 files changed, 129 insertions(+), 12 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/activemq-cpp/blob/66103dd1/activemq-cpp/src/main/activemq/core/ActiveMQConnection.cpp ---------------------------------------------------------------------- diff --git a/activemq-cpp/src/main/activemq/core/ActiveMQConnection.cpp b/activemq-cpp/src/main/activemq/core/ActiveMQConnection.cpp index 0569f97..a8e518d 100644 --- a/activemq-cpp/src/main/activemq/core/ActiveMQConnection.cpp +++ b/activemq-cpp/src/main/activemq/core/ActiveMQConnection.cpp @@ -90,8 +90,8 @@ using namespace decaf::lang; using namespace decaf::lang::exceptions; //////////////////////////////////////////////////////////////////////////////// -namespace activemq{ -namespace core{ +namespace activemq { +namespace core { class ConnectionThreadFactory : public ThreadFactory { private: @@ -176,6 +176,7 @@ namespace core{ bool exclusiveConsumer; bool transactedIndividualAck; bool nonBlockingRedelivery; + bool alwaysSessionAsync; int compressionLevel; unsigned int sendTimeout; unsigned int closeTimeout; @@ -243,6 +244,7 @@ namespace core{ exclusiveConsumer(false), transactedIndividualAck(false), nonBlockingRedelivery(false), + alwaysSessionAsync(true), compressionLevel(-1), sendTimeout(0), closeTimeout(15000), @@ -544,7 +546,8 @@ cms::Session* ActiveMQConnection::createSession(cms::Session::AcknowledgeMode ac // Create the session instance as a Session Kernel we then create and return a // ActiveMQSession instance that acts as a proxy to the kernel caller can delete // that at any time since we only refer to the Pointer to the session kernel. - Pointer<ActiveMQSessionKernel> session(new ActiveMQSessionKernel(this, getNextSessionId(), ackMode, *this->config->properties)); + Pointer<ActiveMQSessionKernel> session( + new ActiveMQSessionKernel(this, getNextSessionId(), ackMode, *this->config->properties)); session->setMessageTransformer(this->config->transformer); @@ -1924,3 +1927,13 @@ bool ActiveMQConnection::isDuplicate(Dispatcher* dispatcher, Pointer<commands::M void ActiveMQConnection::rollbackDuplicate(Dispatcher* dispatcher, Pointer<commands::Message> message) { this->config->connectionAudit.rollbackDuplicate(dispatcher, message); } + +//////////////////////////////////////////////////////////////////////////////// +bool ActiveMQConnection::isAlwaysSessionAsync() const { + return this->config->alwaysSessionAsync; +} + +//////////////////////////////////////////////////////////////////////////////// +void ActiveMQConnection::setAlwaysSessionAsync(bool alwaysSessionAsync) { + this->config->alwaysSessionAsync = alwaysSessionAsync; +} http://git-wip-us.apache.org/repos/asf/activemq-cpp/blob/66103dd1/activemq-cpp/src/main/activemq/core/ActiveMQConnection.h ---------------------------------------------------------------------- diff --git a/activemq-cpp/src/main/activemq/core/ActiveMQConnection.h b/activemq-cpp/src/main/activemq/core/ActiveMQConnection.h index 4b5a10c..08e5d24 100644 --- a/activemq-cpp/src/main/activemq/core/ActiveMQConnection.h +++ b/activemq-cpp/src/main/activemq/core/ActiveMQConnection.h @@ -41,8 +41,8 @@ #include <string> #include <memory> -namespace activemq{ -namespace core{ +namespace activemq { +namespace core { using decaf::lang::Pointer; @@ -786,6 +786,19 @@ namespace core{ */ void setSendAcksAsync(bool sendAcksAsync); + /** + * @return Returns the alwaysSessionAsync configuration setting. + */ + bool isAlwaysSessionAsync() const; + + /** + * If this flag is not set then a separate thread is not used for dispatching messages + * for each Session in the Connection. However, a separate thread is always used if there + * is more than one session, or the session isn't in auto acknowledge or duplicates ok mode. + * By default this value is set to true and session dispatch happens asynchronously. + */ + void setAlwaysSessionAsync(bool alwaysSessionAsync); + public: // TransportListener /** http://git-wip-us.apache.org/repos/asf/activemq-cpp/blob/66103dd1/activemq-cpp/src/main/activemq/core/ActiveMQConnectionFactory.cpp ---------------------------------------------------------------------- diff --git a/activemq-cpp/src/main/activemq/core/ActiveMQConnectionFactory.cpp b/activemq-cpp/src/main/activemq/core/ActiveMQConnectionFactory.cpp index 13d680f..50245be 100644 --- a/activemq-cpp/src/main/activemq/core/ActiveMQConnectionFactory.cpp +++ b/activemq-cpp/src/main/activemq/core/ActiveMQConnectionFactory.cpp @@ -89,6 +89,7 @@ namespace core{ bool exclusiveConsumer; bool transactedIndividualAck; bool nonBlockingRedelivery; + bool alwaysSessionAsync; int compressionLevel; unsigned int sendTimeout; unsigned int closeTimeout; @@ -123,6 +124,7 @@ namespace core{ exclusiveConsumer(false), transactedIndividualAck(false), nonBlockingRedelivery(false), + alwaysSessionAsync(true), compressionLevel(-1), sendTimeout(0), closeTimeout(15000), @@ -216,6 +218,8 @@ namespace core{ properties->getProperty("connection.nonBlockingRedelivery", Boolean::toString(nonBlockingRedelivery))); this->watchTopicAdvisories = Boolean::parseBoolean( properties->getProperty("connection.watchTopicAdvisories", Boolean::toString(watchTopicAdvisories))); + this->alwaysSessionAsync = Boolean::parseBoolean( + properties->getProperty("connection.alwaysSessionAsync", Boolean::toString(alwaysSessionAsync))); this->defaultPrefetchPolicy->configure(*properties); this->defaultRedeliveryPolicy->configure(*properties); @@ -411,6 +415,7 @@ void ActiveMQConnectionFactory::configureConnection(ActiveMQConnection* connecti connection->setUseRetroactiveConsumer(this->settings->useRetroactiveConsumer); connection->setNonBlockingRedelivery(this->settings->nonBlockingRedelivery); connection->setConsumerFailoverRedeliveryWaitPeriod(this->settings->consumerFailoverRedeliveryWaitPeriod); + connection->setAlwaysSessionAsync(this->settings->alwaysSessionAsync); if (this->settings->defaultListener) { connection->setExceptionListener(this->settings->defaultListener); @@ -732,3 +737,13 @@ bool ActiveMQConnectionFactory::isExclusiveConsumer() const { void ActiveMQConnectionFactory::setExclusiveConsumer(bool exclusiveConsumer) { this->settings->exclusiveConsumer = exclusiveConsumer; } + +//////////////////////////////////////////////////////////////////////////////// +bool ActiveMQConnectionFactory::isAlwaysSessionAsync() const { + return this->settings->alwaysSessionAsync; +} + +//////////////////////////////////////////////////////////////////////////////// +void ActiveMQConnectionFactory::setAlwaysSessionAsync(bool alwaysSessionAsync) { + this->settings->alwaysSessionAsync = alwaysSessionAsync; +} http://git-wip-us.apache.org/repos/asf/activemq-cpp/blob/66103dd1/activemq-cpp/src/main/activemq/core/ActiveMQConnectionFactory.h ---------------------------------------------------------------------- diff --git a/activemq-cpp/src/main/activemq/core/ActiveMQConnectionFactory.h b/activemq-cpp/src/main/activemq/core/ActiveMQConnectionFactory.h index e2be727..3828f70 100644 --- a/activemq-cpp/src/main/activemq/core/ActiveMQConnectionFactory.h +++ b/activemq-cpp/src/main/activemq/core/ActiveMQConnectionFactory.h @@ -27,8 +27,8 @@ #include <decaf/net/URI.h> #include <decaf/util/Properties.h> -namespace activemq{ -namespace core{ +namespace activemq { +namespace core { using decaf::lang::Pointer; @@ -640,6 +640,24 @@ namespace core{ */ void setOptimizedAckScheduledAckInterval(long long optimizedAckScheduledAckInterval); + /** + * Returns the current value of the always session async option. + * + * @return Returns the alwaysSessionAsync configuration setting. + */ + bool isAlwaysSessionAsync() const; + + /** + * If this flag is not set 'true' then a separate thread is not used for dispatching messages + * for each Session in the Connection. However, a separate thread is always used if there + * is more than one session, or the session isn't in auto acknowledge or duplicates ok mode. + * By default this value is set to true and session dispatch happens asynchronously. + * + * @param alwaysSessionAsync + * The alwaysSessionAsync value to use when creating new sessions. + */ + void setAlwaysSessionAsync(bool alwaysSessionAsync); + public: /** http://git-wip-us.apache.org/repos/asf/activemq-cpp/blob/66103dd1/activemq-cpp/src/main/activemq/core/ActiveMQSessionExecutor.cpp ---------------------------------------------------------------------- diff --git a/activemq-cpp/src/main/activemq/core/ActiveMQSessionExecutor.cpp b/activemq-cpp/src/main/activemq/core/ActiveMQSessionExecutor.cpp index 48a919c..5392950 100644 --- a/activemq-cpp/src/main/activemq/core/ActiveMQSessionExecutor.cpp +++ b/activemq-cpp/src/main/activemq/core/ActiveMQSessionExecutor.cpp @@ -81,9 +81,12 @@ ActiveMQSessionExecutor::~ActiveMQSessionExecutor() { //////////////////////////////////////////////////////////////////////////////// void ActiveMQSessionExecutor::execute(const Pointer<MessageDispatch>& dispatch) { - // Add the data to the queue. - this->messageQueue->enqueue(dispatch); - this->wakeup(); + if (this->session->isSessionAsyncDispatch()) { + this->messageQueue->enqueue(dispatch); + this->wakeup(); + } else { + this->dispatch(dispatch); + } } //////////////////////////////////////////////////////////////////////////////// @@ -97,6 +100,10 @@ void ActiveMQSessionExecutor::executeFirst(const Pointer<MessageDispatch>& dispa //////////////////////////////////////////////////////////////////////////////// void ActiveMQSessionExecutor::wakeup() { + if (!this->session->isSessionAsyncDispatch()) { + return; + } + Pointer<TaskRunner> taskRunner; synchronized(messageQueue.get()) { if (this->taskRunner == NULL) { http://git-wip-us.apache.org/repos/asf/activemq-cpp/blob/66103dd1/activemq-cpp/src/main/activemq/core/kernels/ActiveMQSessionKernel.cpp ---------------------------------------------------------------------- diff --git a/activemq-cpp/src/main/activemq/core/kernels/ActiveMQSessionKernel.cpp b/activemq-cpp/src/main/activemq/core/kernels/ActiveMQSessionKernel.cpp index 0f320e8..776487f 100644 --- a/activemq-cpp/src/main/activemq/core/kernels/ActiveMQSessionKernel.cpp +++ b/activemq-cpp/src/main/activemq/core/kernels/ActiveMQSessionKernel.cpp @@ -100,13 +100,14 @@ namespace kernels{ Mutex sendMutex; cms::MessageTransformer* transformer; int hashCode; + bool sessionAsyncDispatch; public: SessionConfig() : synchronizationRegistered(false), producerLock(), producers(), consumerLock(), consumers(), scheduler(), closeSync(), sendMutex(), transformer(NULL), - hashCode() {} + hashCode(), sessionAsyncDispatch(true) {} ~SessionConfig() {} }; @@ -226,8 +227,9 @@ ActiveMQSessionKernel::ActiveMQSessionKernel(ActiveMQConnection* connection, this->closed.set(false); this->lastDeliveredSequenceId = -1; + this->config->sessionAsyncDispatch = connection->isAlwaysSessionAsync(); - // Create a Transaction objet + // Create a Transaction object this->transaction.reset(new ActiveMQTransactionContext(this, properties)); // Create the session executor object. @@ -1495,3 +1497,13 @@ void ActiveMQSessionKernel::sendAck(Pointer<MessageAck> ack, bool async) { this->connection->syncRequest(ack); } } + +//////////////////////////////////////////////////////////////////////////////// +bool ActiveMQSessionKernel::isSessionAsyncDispatch() const { + return this->config->sessionAsyncDispatch; +} + +//////////////////////////////////////////////////////////////////////////////// +void ActiveMQSessionKernel::setSessionAsyncDispatch(bool sessionAsyncDispatch) { + this->config->sessionAsyncDispatch = sessionAsyncDispatch; +} http://git-wip-us.apache.org/repos/asf/activemq-cpp/blob/66103dd1/activemq-cpp/src/main/activemq/core/kernels/ActiveMQSessionKernel.h ---------------------------------------------------------------------- diff --git a/activemq-cpp/src/main/activemq/core/kernels/ActiveMQSessionKernel.h b/activemq-cpp/src/main/activemq/core/kernels/ActiveMQSessionKernel.h index f4af526..f05c56a 100644 --- a/activemq-cpp/src/main/activemq/core/kernels/ActiveMQSessionKernel.h +++ b/activemq-cpp/src/main/activemq/core/kernels/ActiveMQSessionKernel.h @@ -568,6 +568,20 @@ namespace kernels { */ void sendAck(decaf::lang::Pointer<commands::MessageAck> ack, bool async = false); + /** + * Returns true if this session is dispatching messages to its consumers asynchronously. + * + * @return Returns the sessionAsyncDispatch. + */ + bool isSessionAsyncDispatch() const; + + /** + * Configures asynchronous message dispatch to this session's consumers. + * + * @param sessionAsyncDispatch The sessionAsyncDispatch to set. + */ + void setSessionAsyncDispatch(bool sessionAsyncDispatch); + private: /** http://git-wip-us.apache.org/repos/asf/activemq-cpp/blob/66103dd1/activemq-cpp/src/test-integration/activemq/test/openwire/OpenwireSimpleTest.cpp ---------------------------------------------------------------------- diff --git a/activemq-cpp/src/test-integration/activemq/test/openwire/OpenwireSimpleTest.cpp b/activemq-cpp/src/test-integration/activemq/test/openwire/OpenwireSimpleTest.cpp index 7d3d782..5ce97d4 100644 --- a/activemq-cpp/src/test-integration/activemq/test/openwire/OpenwireSimpleTest.cpp +++ b/activemq-cpp/src/test-integration/activemq/test/openwire/OpenwireSimpleTest.cpp @@ -383,3 +383,26 @@ void OpenwireSimpleTest::testMessageIdSetOnSend() { CPPUNIT_ASSERT(message->getCMSMessageID() != ""); CPPUNIT_ASSERT(message->getCMSDestination() != NULL); } + +//////////////////////////////////////////////////////////////////////////////// +void OpenwireSimpleTest::testReceiveWithSessionSyncDispatch() { + + ActiveMQConnection* amqConnection = dynamic_cast<ActiveMQConnection*>(cmsProvider->getConnection()); + amqConnection->setAlwaysSessionAsync(false); + + cmsProvider->reconnectSession(); + + // Create CMS Object for Comms + cms::Session* session( cmsProvider->getSession() ); + cms::MessageConsumer* consumer = cmsProvider->getConsumer(); + cms::MessageProducer* producer = cmsProvider->getProducer(); + producer->setDeliveryMode( DeliveryMode::NON_PERSISTENT ); + + auto_ptr<cms::TextMessage> txtMessage( session->createTextMessage( "TEST MESSAGE" ) ); + + // Send some text messages + producer->send( txtMessage.get() ); + + auto_ptr<cms::Message> message( consumer->receive( 1000 ) ); + CPPUNIT_ASSERT( message.get() != NULL ); +} http://git-wip-us.apache.org/repos/asf/activemq-cpp/blob/66103dd1/activemq-cpp/src/test-integration/activemq/test/openwire/OpenwireSimpleTest.h ---------------------------------------------------------------------- diff --git a/activemq-cpp/src/test-integration/activemq/test/openwire/OpenwireSimpleTest.h b/activemq-cpp/src/test-integration/activemq/test/openwire/OpenwireSimpleTest.h index 3be5f00..1d399a1 100644 --- a/activemq-cpp/src/test-integration/activemq/test/openwire/OpenwireSimpleTest.h +++ b/activemq-cpp/src/test-integration/activemq/test/openwire/OpenwireSimpleTest.h @@ -50,6 +50,7 @@ namespace openwire{ CPPUNIT_TEST( testLibraryInitShutdownInit ); CPPUNIT_TEST( testBytesMessageSendRecvAsync ); CPPUNIT_TEST( testMessageIdSetOnSend ); + CPPUNIT_TEST( testReceiveWithSessionSyncDispatch ); CPPUNIT_TEST_SUITE_END(); public: @@ -70,6 +71,7 @@ namespace openwire{ void tesstStreamMessage(); void testDestroyDestination(); void testMessageIdSetOnSend(); + void testReceiveWithSessionSyncDispatch(); };
