Author: tabish
Date: Mon Apr 1 21:56:01 2013
New Revision: 1463311
URL: http://svn.apache.org/r1463311
Log:
https://issues.apache.org/jira/browse/AMQCPP-473
https://issues.apache.org/jira/browse/AMQCPP-472
https://issues.apache.org/jira/browse/AMQCPP-471
Modified:
activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQConnection.cpp
activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQConnection.h
activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQConnectionFactory.cpp
activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQConnectionFactory.h
activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ConnectionAudit.cpp
activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ConnectionAudit.h
activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/Dispatcher.h
activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/kernels/ActiveMQConsumerKernel.cpp
activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/kernels/ActiveMQConsumerKernel.h
activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/kernels/ActiveMQSessionKernel.cpp
activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/kernels/ActiveMQSessionKernel.h
activemq/activemq-cpp/trunk/activemq-cpp/src/test/activemq/core/ConnectionAuditTest.cpp
Modified:
activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQConnection.cpp
URL:
http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQConnection.cpp?rev=1463311&r1=1463310&r2=1463311&view=diff
==============================================================================
---
activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQConnection.cpp
(original)
+++
activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQConnection.cpp
Mon Apr 1 21:56:01 2013
@@ -22,6 +22,7 @@
#include <activemq/core/ActiveMQSession.h>
#include <activemq/core/ActiveMQConstants.h>
#include <activemq/core/ActiveMQConnectionMetaData.h>
+#include <activemq/core/ActiveMQMessageAudit.h>
#include <activemq/core/AdvisoryConsumer.h>
#include <activemq/core/ConnectionAudit.h>
#include <activemq/core/kernels/ActiveMQSessionKernel.h>
@@ -164,13 +165,25 @@ namespace core{
bool dispatchAsync;
bool alwaysSyncSend;
bool useAsyncSend;
+ bool sendAcksAsync;
bool messagePrioritySupported;
bool watchTopicAdvisories;
bool useCompression;
+ bool useRetroactiveConsumer;
+ bool checkForDuplicates;
+ bool optimizeAcknowledge;
+ bool exclusiveConsumer;
+ bool transactedIndividualAck;
+ bool nonBlockingRedelivery;
int compressionLevel;
unsigned int sendTimeout;
unsigned int closeTimeout;
unsigned int producerWindowSize;
+ int auditDepth;
+ int auditMaximumProducerNumber;
+ long long optimizeAcknowledgeTimeOut;
+ long long optimizedAckScheduledAckInterval;
+ long long consumerFailoverRedeliveryWaitPeriod;
std::auto_ptr<PrefetchPolicy> defaultPrefetchPolicy;
std::auto_ptr<RedeliveryPolicy> defaultRedeliveryPolicy;
@@ -219,13 +232,25 @@ namespace core{
dispatchAsync(true),
alwaysSyncSend(false),
useAsyncSend(false),
+ sendAcksAsync(true),
messagePrioritySupported(true),
watchTopicAdvisories(true),
useCompression(false),
+ useRetroactiveConsumer(false),
+ checkForDuplicates(true),
+ optimizeAcknowledge(false),
+ exclusiveConsumer(false),
+ transactedIndividualAck(false),
+ nonBlockingRedelivery(false),
compressionLevel(-1),
sendTimeout(0),
closeTimeout(15000),
producerWindowSize(0),
+
auditDepth(ActiveMQMessageAudit::DEFAULT_WINDOW_SIZE),
+
auditMaximumProducerNumber(ActiveMQMessageAudit::MAXIMUM_PRODUCER_COUNT),
+ optimizeAcknowledgeTimeOut(300),
+ optimizedAckScheduledAckInterval(0),
+ consumerFailoverRedeliveryWaitPeriod(0),
defaultPrefetchPolicy(NULL),
defaultRedeliveryPolicy(NULL),
exceptionListener(NULL),
@@ -560,6 +585,7 @@ void ActiveMQConnection::removeSession(P
this->config->sessionsLock.writeLock().lock();
try {
this->config->activeSessions.remove(session);
+ this->config->connectionAudit.removeDispatcher(session.get());
this->config->sessionsLock.writeLock().unlock();
} catch (Exception& ex) {
this->config->sessionsLock.writeLock().unlock();
@@ -1668,6 +1694,126 @@ void ActiveMQConnection::setWatchTopicAd
}
////////////////////////////////////////////////////////////////////////////////
+int ActiveMQConnection::getAuditDepth() const {
+ return this->config->auditDepth;
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void ActiveMQConnection::setAuditDepth(int auditDepth) {
+ this->config->auditDepth = auditDepth;
+}
+
+////////////////////////////////////////////////////////////////////////////////
+int ActiveMQConnection::getAuditMaximumProducerNumber() const {
+ return this->config->auditMaximumProducerNumber;
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void ActiveMQConnection::setAuditMaximumProducerNumber(int
auditMaximumProducerNumber) {
+ this->config->auditMaximumProducerNumber = auditMaximumProducerNumber;
+}
+
+////////////////////////////////////////////////////////////////////////////////
+bool ActiveMQConnection::isCheckForDuplicates() const {
+ return this->config->checkForDuplicates;
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void ActiveMQConnection::setCheckForDuplicates(bool checkForDuplicates) {
+ this->config->checkForDuplicates = checkForDuplicates;
+}
+
+////////////////////////////////////////////////////////////////////////////////
+bool ActiveMQConnection::isSendAcksAsync() const {
+ return this->config->sendAcksAsync;
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void ActiveMQConnection::setSendAcksAsync(bool sendAcksAsync) {
+ this->config->sendAcksAsync = sendAcksAsync;
+}
+
+////////////////////////////////////////////////////////////////////////////////
+bool ActiveMQConnection::isTransactedIndividualAck() const {
+ return this->config->transactedIndividualAck;
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void ActiveMQConnection::setTransactedIndividualAck(bool
transactedIndividualAck) {
+ this->config->transactedIndividualAck = transactedIndividualAck;
+}
+
+////////////////////////////////////////////////////////////////////////////////
+bool ActiveMQConnection::isNonBlockingRedelivery() const {
+ return this->config->nonBlockingRedelivery;
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void ActiveMQConnection::setNonBlockingRedelivery(bool nonBlockingRedelivery) {
+ this->config->nonBlockingRedelivery = nonBlockingRedelivery;
+}
+
+////////////////////////////////////////////////////////////////////////////////
+bool ActiveMQConnection::isOptimizeAcknowledge() const {
+ return this->config->optimizeAcknowledge;
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void ActiveMQConnection::setOptimizeAcknowledge(bool optimizeAcknowledge) {
+ this->config->optimizeAcknowledge = optimizeAcknowledge;
+}
+
+////////////////////////////////////////////////////////////////////////////////
+long long ActiveMQConnection::getOptimizeAcknowledgeTimeOut() const {
+ return this->config->optimizeAcknowledgeTimeOut;
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void ActiveMQConnection::setOptimizeAcknowledgeTimeOut(long long
optimizeAcknowledgeTimeOut) {
+ this->config->optimizeAcknowledgeTimeOut = optimizeAcknowledgeTimeOut;
+}
+
+////////////////////////////////////////////////////////////////////////////////
+long long ActiveMQConnection::getOptimizedAckScheduledAckInterval() const {
+ return this->config->optimizedAckScheduledAckInterval;
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void ActiveMQConnection::setOptimizedAckScheduledAckInterval(long long
optimizedAckScheduledAckInterval) {
+ this->config->optimizedAckScheduledAckInterval =
optimizedAckScheduledAckInterval;
+}
+
+////////////////////////////////////////////////////////////////////////////////
+long long ActiveMQConnection::getConsumerFailoverRedeliveryWaitPeriod() const {
+ return this->config->consumerFailoverRedeliveryWaitPeriod;
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void ActiveMQConnection::setConsumerFailoverRedeliveryWaitPeriod(long long
value) {
+ this->config->consumerFailoverRedeliveryWaitPeriod = value;
+}
+
+////////////////////////////////////////////////////////////////////////////////
+bool ActiveMQConnection::isUseRetroactiveConsumer() const {
+ return this->config->useRetroactiveConsumer;
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void ActiveMQConnection::setUseRetroactiveConsumer(bool
useRetroactiveConsumer) {
+ this->config->useRetroactiveConsumer = useRetroactiveConsumer;
+}
+
+////////////////////////////////////////////////////////////////////////////////
+bool ActiveMQConnection::isExclusiveConsumer() const {
+ return this->config->exclusiveConsumer;
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void ActiveMQConnection::setExclusiveConsumer(bool exclusiveConsumer) {
+ this->config->exclusiveConsumer = exclusiveConsumer;
+}
+
+////////////////////////////////////////////////////////////////////////////////
void ActiveMQConnection::addTempDestination(Pointer<ActiveMQTempDestination>
destination) {
this->config->activeTempDestinations.put(destination, destination);
}
@@ -1759,3 +1905,18 @@ bool ActiveMQConnection::isDeleted(Point
return !this->config->activeTempDestinations.containsKey(destination);
}
+
+////////////////////////////////////////////////////////////////////////////////
+bool ActiveMQConnection::isDuplicate(Dispatcher* dispatcher,
Pointer<commands::Message> message) {
+
+ if (this->config->checkForDuplicates) {
+ return this->config->connectionAudit.isDuplicate(dispatcher, message);
+ }
+
+ return false;
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void ActiveMQConnection::rollbackDuplicate(Dispatcher* dispatcher,
Pointer<commands::Message> message) {
+ this->config->connectionAudit.rollbackDuplicate(dispatcher, message);
+}
Modified:
activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQConnection.h
URL:
http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQConnection.h?rev=1463311&r1=1463310&r2=1463311&view=diff
==============================================================================
---
activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQConnection.h
(original)
+++
activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQConnection.h
Mon Apr 1 21:56:01 2013
@@ -238,6 +238,28 @@ namespace core{
*/
virtual void destroyDestination(const cms::Destination* destination);
+ /**
+ * Allows Consumers to check if an incoming Message is a Duplicate.
+ *
+ * @param dispatcher
+ * The Dispatcher that is checking the Message for Duplication.
+ * @param message
+ * The Message that should be checked.
+ *
+ * @returns true if the Message was seen before.
+ */
+ bool isDuplicate(Dispatcher* dispatcher, Pointer<commands::Message>
message);
+
+ /**
+ * Mark message as received.
+ *
+ * @param dispatcher
+ * The Dispatcher instance that has received the Message.
+ * @param message
+ * The Message that has been received.
+ */
+ void rollbackDuplicate(Dispatcher* dispatcher,
Pointer<commands::Message> message);
+
public: // Connection Interface Methods
/**
@@ -543,6 +565,222 @@ namespace core{
*/
void setWatchTopicAdvisories(bool value);
+ /**
+ * Get the audit depth for Messages for consumers when using a fault
+ * tolerant transport. The higher the value the more messages are
checked
+ * for duplication, and the larger the performance impact of duplicate
+ * detection will be.
+ *
+ * @returns the configured audit depth.
+ */
+ int getAuditDepth() const;
+
+ /**
+ * Set the audit depth for Messages for consumers when using a fault
+ * tolerant transport. The higher the value the more messages are
checked
+ * for duplication, and the larger the performance impact of duplicate
+ * detection will be.
+ *
+ * @param auditDepth
+ * The configured audit depth.
+ */
+ void setAuditDepth(int auditDepth);
+
+ /**
+ * The number of Producers that will be audited.
+ *
+ * @returns the configured number of producers to include in the audit.
+ */
+ int getAuditMaximumProducerNumber() const;
+
+ /**
+ * The number of Producers that will be audited.
+ *
+ * @param auditMaximumProducerNumber
+ * The configured number of producers to include in the audit.
+ */
+ void setAuditMaximumProducerNumber(int auditMaximumProducerNumber);
+
+ /**
+ * Gets the value of the configured Duplicate Message detection
feature.
+ *
+ * When enabled and a fault tolerant transport is used (think
failover) then
+ * this feature will help to detect and filter duplicate messages that
might
+ * otherwise be delivered to a consumer after a connection failure.
+ *
+ * Disabling this can increase performance since no Message auditing
will
+ * occur.
+ *
+ * @return the checkForDuplicates value currently set.
+ */
+ bool isCheckForDuplicates() const;
+
+ /**
+ * Gets the value of the configured Duplicate Message detection
feature.
+ *
+ * When enabled and a fault tolerant transport is used (think
failover) then
+ * this feature will help to detect and filter duplicate messages that
might
+ * otherwise be delivered to a consumer after a connection failure.
+ *
+ * Disabling this can increase performance since no Message auditing
will
+ * occur.
+ *
+ * @param checkForDuplicates
+ * The checkForDuplicates value to be configured.
+ */
+ void setCheckForDuplicates(bool checkForDuplicates);
+
+ /**
+ * when true, submit individual transacted acks immediately rather
than with transaction
+ * completion. This allows the acks to represent delivery status
which can be persisted on
+ * rollback Used in conjunction with KahaDB set to Rewrite On
Redelivery.
+ *
+ * @returns true if this option is enabled.
+ */
+ bool isTransactedIndividualAck() const;
+
+ /**
+ * when true, submit individual transacted acks immediately rather
than with transaction
+ * completion. This allows the acks to represent delivery status
which can be persisted on
+ * rollback Used in conjunction with KahaDB set to Rewrite On
Redelivery.
+ *
+ * @param transactedIndividualAck
+ * The value to set.
+ */
+ void setTransactedIndividualAck(bool transactedIndividualAck);
+
+ /**
+ * Returns true if non-blocking redelivery of Messages is configured
for Consumers
+ * that are rolled back or recovered.
+ *
+ * @return true if non-blocking redelivery is enabled.
+ */
+ bool isNonBlockingRedelivery() const;
+
+ /**
+ * When true a MessageConsumer will not stop Message delivery before
re-delivering Messages
+ * from a rolled back transaction. This implies that message order
will not be preserved and
+ * also will result in the TransactedIndividualAck option to be
enabled.
+ *
+ * @param nonBlockingRedelivery
+ * The value to configure for non-blocking redelivery.
+ */
+ void setNonBlockingRedelivery(bool nonBlockingRedelivery);
+
+ /**
+ * Gets the delay period for a consumer redelivery.
+ *
+ * @returns configured time delay in milliseconds.
+ */
+ long long getConsumerFailoverRedeliveryWaitPeriod() const;
+
+ /**
+ * Sets the delay period for a consumer redelivery.
+ *
+ * @param value
+ * The configured time delay in milliseconds.
+ */
+ void setConsumerFailoverRedeliveryWaitPeriod(long long value);
+
+ /**
+ * @return true if optimizeAcknowledge is enabled.
+ */
+ bool isOptimizeAcknowledge() const;
+
+ /**
+ * Sets if Consumers are configured to use Optimized Acknowledge by
default.
+ *
+ * @param optimizeAcknowledge
+ * The optimizeAcknowledge mode to set.
+ */
+ void setOptimizeAcknowledge(bool optimizeAcknowledge);
+
+ /**
+ * Gets the time between optimized ack batches in milliseconds.
+ *
+ * @returns time between optimized ack batches in Milliseconds.
+ */
+ long long getOptimizeAcknowledgeTimeOut() const;
+
+ /**
+ * The max time in milliseconds between optimized ack batches.
+ *
+ * @param optimizeAcknowledgeTimeOut
+ * The time in milliseconds for optimized ack batches.
+ */
+ void setOptimizeAcknowledgeTimeOut(long long
optimizeAcknowledgeTimeOut);
+
+ /**
+ * Gets the configured time interval that is used to force all
MessageConsumers that have
+ * optimizedAcknowledge enabled to send an ack for any outstanding
Message Acks. By default
+ * this value is set to zero meaning that the consumers will not do
any background Message
+ * acknowledgment.
+ *
+ * @return the scheduledOptimizedAckInterval
+ */
+ long long getOptimizedAckScheduledAckInterval() const;
+
+ /**
+ * Sets the amount of time between scheduled sends of any outstanding
Message Acks for
+ * consumers that have been configured with optimizeAcknowledge
enabled.
+ *
+ * Time is given in Milliseconds.
+ *
+ * @param optimizedAckScheduledAckInterval
+ * The scheduledOptimizedAckInterval to use for new Consumers.
+ */
+ void setOptimizedAckScheduledAckInterval(long long
optimizedAckScheduledAckInterval);
+
+ /**
+ * Should all created consumers be retroactive.
+ *
+ * @returns true if consumer will be created with the retroactive flag
set.
+ */
+ bool isUseRetroactiveConsumer() const;
+
+ /**
+ * Sets whether or not retroactive consumers are enabled. Retroactive
+ * consumers allow non-durable topic subscribers to receive old
messages
+ * that were published before the non-durable subscriber started.
+ *
+ * @param useRetroactiveConsumer
+ * The value of this configuration option.
+ */
+ void setUseRetroactiveConsumer(bool useRetroactiveConsumer);
+
+ /**
+ * Should all created consumers be exclusive.
+ *
+ * @returns true if consumer will be created with the exclusive flag
set.
+ */
+ bool isExclusiveConsumer() const;
+
+ /**
+ * Enables or disables whether or not queue consumers should be
exclusive or
+ * not for example to preserve ordering when not using Message Groups.
+ *
+ * @param exclusiveConsumer
+ * The value of this configuration option.
+ */
+ void setExclusiveConsumer(bool exclusiveConsumer);
+
+ /**
+ * Returns whether Message acknowledgments are sent asynchronously
meaning no
+ * response is required from the broker before the ack completes.
+ *
+ * @return the sendAcksAsync configured value.
+ */
+ bool isSendAcksAsync() const;
+
+ /**
+ * Sets whether Message acknowledgments are sent asynchronously
meaning no
+ * response is required from the broker before the ack completes.
+ *
+ * @param sendAcksAsync
+ * The sendAcksAsync configuration value to set.
+ */
+ void setSendAcksAsync(bool sendAcksAsync);
+
public: // TransportListener
/**
Modified:
activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQConnectionFactory.cpp
URL:
http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQConnectionFactory.cpp?rev=1463311&r1=1463310&r2=1463311&view=diff
==============================================================================
---
activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQConnectionFactory.cpp
(original)
+++
activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQConnectionFactory.cpp
Mon Apr 1 21:56:01 2013
@@ -22,6 +22,7 @@
#include <decaf/util/concurrent/Mutex.h>
#include <decaf/lang/Boolean.h>
#include <decaf/lang/Integer.h>
+#include <decaf/lang/Long.h>
#include <decaf/lang/Pointer.h>
#include <decaf/lang/Math.h>
#include <decaf/lang/exceptions/NullPointerException.h>
@@ -29,6 +30,7 @@
#include <activemq/transport/TransportRegistry.h>
#include <activemq/core/ActiveMQConnection.h>
#include <activemq/core/ActiveMQConstants.h>
+#include <activemq/core/ActiveMQMessageAudit.h>
#include <activemq/core/policies/DefaultPrefetchPolicy.h>
#include <activemq/core/policies/DefaultRedeliveryPolicy.h>
#include <activemq/util/URISupport.h>
@@ -77,13 +79,25 @@ namespace core{
bool dispatchAsync;
bool alwaysSyncSend;
bool useAsyncSend;
+ bool sendAcksAsync;
bool messagePrioritySupported;
bool useCompression;
+ bool useRetroactiveConsumer;
bool watchTopicAdvisories;
+ bool checkForDuplicates;
+ bool optimizeAcknowledge;
+ bool exclusiveConsumer;
+ bool transactedIndividualAck;
+ bool nonBlockingRedelivery;
int compressionLevel;
unsigned int sendTimeout;
unsigned int closeTimeout;
unsigned int producerWindowSize;
+ int auditDepth;
+ int auditMaximumProducerNumber;
+ long long optimizeAcknowledgeTimeOut;
+ long long optimizedAckScheduledAckInterval;
+ long long consumerFailoverRedeliveryWaitPeriod;
cms::ExceptionListener* defaultListener;
cms::MessageTransformer* defaultTransformer;
@@ -99,13 +113,25 @@ namespace core{
dispatchAsync(true),
alwaysSyncSend(false),
useAsyncSend(false),
+ sendAcksAsync(true),
messagePrioritySupported(true),
useCompression(false),
+ useRetroactiveConsumer(false),
watchTopicAdvisories(true),
+ checkForDuplicates(true),
+ optimizeAcknowledge(false),
+ exclusiveConsumer(false),
+ transactedIndividualAck(false),
+ nonBlockingRedelivery(false),
compressionLevel(-1),
sendTimeout(0),
closeTimeout(15000),
producerWindowSize(0),
+
auditDepth(ActiveMQMessageAudit::DEFAULT_WINDOW_SIZE),
+
auditMaximumProducerNumber(ActiveMQMessageAudit::MAXIMUM_PRODUCER_COUNT),
+ optimizeAcknowledgeTimeOut(300),
+ optimizedAckScheduledAckInterval(0),
+ consumerFailoverRedeliveryWaitPeriod(0),
defaultListener(NULL),
defaultTransformer(NULL),
defaultPrefetchPolicy(new DefaultPrefetchPolicy()),
@@ -118,15 +144,12 @@ namespace core{
this->properties->clear();
if (uri.getQuery() != "") {
-
// Not a composite URI so this works fine.
try {
URISupport::parseQuery(uri.getQuery(), properties.get());
} catch (URISyntaxException& ex) {
}
-
} else {
-
// Composite URI won't indicate it has a query even if it does.
try {
CompositeData composite = URISupport::parseComposite(uri);
@@ -139,45 +162,56 @@ namespace core{
this->alwaysSyncSend = Boolean::parseBoolean(
properties->getProperty(core::ActiveMQConstants::toString(
core::ActiveMQConstants::CONNECTION_ALWAYSSYNCSEND),
"false"));
-
this->useAsyncSend = Boolean::parseBoolean(
properties->getProperty(core::ActiveMQConstants::toString(
core::ActiveMQConstants::CONNECTION_USEASYNCSEND),
"false"));
-
this->useCompression = Boolean::parseBoolean(
properties->getProperty(core::ActiveMQConstants::toString(
core::ActiveMQConstants::CONNECTION_USECOMPRESSION),
"false"));
-
this->compressionLevel = Integer::parseInt(
properties->getProperty("connection.compressionLevel", "-1"));
-
this->messagePrioritySupported = Boolean::parseBoolean(
properties->getProperty("connection.messagePrioritySupported",
"true"));
-
+ this->messagePrioritySupported = Boolean::parseBoolean(
+ properties->getProperty("connection.checkForDuplicates",
"true"));
+ this->messagePrioritySupported = Integer::parseInt(
+ properties->getProperty("connection.auditDepth", "2048"));
+ this->messagePrioritySupported = Integer::parseInt(
+
properties->getProperty("connection.auditMaximumProducerNumber", "64"));
this->dispatchAsync = Boolean::parseBoolean(
properties->getProperty(core::ActiveMQConstants::toString(
core::ActiveMQConstants::CONNECTION_DISPATCHASYNC),
"true"));
-
this->producerWindowSize = Integer::parseInt(
properties->getProperty(core::ActiveMQConstants::toString(
core::ActiveMQConstants::CONNECTION_PRODUCERWINDOWSIZE),
"0"));
-
this->sendTimeout = decaf::lang::Integer::parseInt(
properties->getProperty(core::ActiveMQConstants::toString(
core::ActiveMQConstants::CONNECTION_SENDTIMEOUT), "0"));
-
this->closeTimeout = decaf::lang::Integer::parseInt(
properties->getProperty(core::ActiveMQConstants::toString(
core::ActiveMQConstants::CONNECTION_CLOSETIMEOUT),
"15000"));
-
this->clientId = properties->getProperty(
core::ActiveMQConstants::toString(core::ActiveMQConstants::PARAM_CLIENTID),
clientId);
-
this->username = properties->getProperty(
core::ActiveMQConstants::toString(core::ActiveMQConstants::PARAM_USERNAME),
username);
-
this->password = properties->getProperty(
core::ActiveMQConstants::toString(core::ActiveMQConstants::PARAM_PASSWORD),
password);
+ this->optimizeAcknowledge = Boolean::parseBoolean(
+ properties->getProperty("connection.optimizeAcknowledge",
"false"));
+ this->optimizeAcknowledge = Boolean::parseBoolean(
+ properties->getProperty("connection.exclusiveConsumer",
"false"));
+ this->optimizeAcknowledge = Boolean::parseBoolean(
+ properties->getProperty("connection.transactedIndividualAck",
"false"));
+ this->optimizeAcknowledge = Boolean::parseBoolean(
+ properties->getProperty("connection.useRetroactiveConsumer",
"false"));
+ this->optimizeAcknowledge = Boolean::parseBoolean(
+ properties->getProperty("connection.sendAcksAsync", "true"));
+ this->messagePrioritySupported = Long::parseLong(
+
properties->getProperty("connection.optimizeAcknowledgeTimeOut", "300"));
+ this->messagePrioritySupported = Long::parseLong(
+
properties->getProperty("connection.optimizedAckScheduledAckInterval", "0"));
+ this->messagePrioritySupported = Long::parseLong(
+
properties->getProperty("connection.consumerFailoverRedeliveryWaitPeriod",
"0"));
this->defaultPrefetchPolicy->configure(*properties);
this->defaultRedeliveryPolicy->configure(*properties);
@@ -361,6 +395,9 @@ void ActiveMQConnectionFactory::configur
connection->setRedeliveryPolicy(this->settings->defaultRedeliveryPolicy->clone());
connection->setMessagePrioritySupported(this->settings->messagePrioritySupported);
connection->setWatchTopicAdvisories(this->settings->watchTopicAdvisories);
+ connection->setCheckForDuplicates(this->settings->checkForDuplicates);
+ connection->setAuditDepth(this->settings->auditDepth);
+
connection->setAuditMaximumProducerNumber(this->settings->auditMaximumProducerNumber);
if (this->settings->defaultListener) {
connection->setExceptionListener(this->settings->defaultListener);
@@ -489,6 +526,16 @@ void ActiveMQConnectionFactory::setUseAs
}
////////////////////////////////////////////////////////////////////////////////
+bool ActiveMQConnectionFactory::isSendAcksAsync() const {
+ return this->settings->sendAcksAsync;
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void ActiveMQConnectionFactory::setSendAcksAsync(bool sendAcksAsync) {
+ this->settings->sendAcksAsync = sendAcksAsync;
+}
+
+////////////////////////////////////////////////////////////////////////////////
bool ActiveMQConnectionFactory::isUseCompression() const {
return this->settings->useCompression;
}
@@ -562,3 +609,113 @@ bool ActiveMQConnectionFactory::isWatchT
void ActiveMQConnectionFactory::setWatchTopicAdvisories(bool value) {
this->settings->watchTopicAdvisories = value;
}
+
+////////////////////////////////////////////////////////////////////////////////
+int ActiveMQConnectionFactory::getAuditDepth() const {
+ return this->settings->auditDepth;
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void ActiveMQConnectionFactory::setAuditDepth(int auditDepth) {
+ this->settings->auditDepth = auditDepth;
+}
+
+////////////////////////////////////////////////////////////////////////////////
+int ActiveMQConnectionFactory::getAuditMaximumProducerNumber() const {
+ return this->settings->auditMaximumProducerNumber;
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void ActiveMQConnectionFactory::setAuditMaximumProducerNumber(int
auditMaximumProducerNumber) {
+ this->settings->auditMaximumProducerNumber = auditMaximumProducerNumber;
+}
+
+////////////////////////////////////////////////////////////////////////////////
+bool ActiveMQConnectionFactory::isCheckForDuplicates() const {
+ return this->settings->checkForDuplicates;
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void ActiveMQConnectionFactory::setCheckForDuplicates(bool checkForDuplicates)
{
+ this->settings->checkForDuplicates = checkForDuplicates;
+}
+
+////////////////////////////////////////////////////////////////////////////////
+bool ActiveMQConnectionFactory::isTransactedIndividualAck() const {
+ return this->settings->transactedIndividualAck;
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void ActiveMQConnectionFactory::setTransactedIndividualAck(bool
transactedIndividualAck) {
+ this->settings->transactedIndividualAck = transactedIndividualAck;
+}
+
+////////////////////////////////////////////////////////////////////////////////
+bool ActiveMQConnectionFactory::isNonBlockingRedelivery() const {
+ return this->settings->nonBlockingRedelivery;
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void ActiveMQConnectionFactory::setNonBlockingRedelivery(bool
nonBlockingRedelivery) {
+ this->settings->nonBlockingRedelivery = nonBlockingRedelivery;
+}
+
+////////////////////////////////////////////////////////////////////////////////
+bool ActiveMQConnectionFactory::isOptimizeAcknowledge() const {
+ return this->settings->optimizeAcknowledge;
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void ActiveMQConnectionFactory::setOptimizeAcknowledge(bool
optimizeAcknowledge) {
+ this->settings->optimizeAcknowledge = optimizeAcknowledge;
+}
+
+////////////////////////////////////////////////////////////////////////////////
+long long ActiveMQConnectionFactory::getOptimizeAcknowledgeTimeOut() const {
+ return this->settings->optimizeAcknowledgeTimeOut;
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void ActiveMQConnectionFactory::setOptimizeAcknowledgeTimeOut(long long
optimizeAcknowledgeTimeOut) {
+ this->settings->optimizeAcknowledgeTimeOut = optimizeAcknowledgeTimeOut;
+}
+
+////////////////////////////////////////////////////////////////////////////////
+long long ActiveMQConnectionFactory::getOptimizedAckScheduledAckInterval()
const {
+ return this->settings->optimizedAckScheduledAckInterval;
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void ActiveMQConnectionFactory::setOptimizedAckScheduledAckInterval(long long
optimizedAckScheduledAckInterval) {
+ this->settings->optimizedAckScheduledAckInterval =
optimizedAckScheduledAckInterval;
+}
+
+////////////////////////////////////////////////////////////////////////////////
+long long ActiveMQConnectionFactory::getConsumerFailoverRedeliveryWaitPeriod()
const {
+ return this->settings->consumerFailoverRedeliveryWaitPeriod;
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void ActiveMQConnectionFactory::setConsumerFailoverRedeliveryWaitPeriod(long
long value) {
+ this->settings->consumerFailoverRedeliveryWaitPeriod = value;
+}
+
+////////////////////////////////////////////////////////////////////////////////
+bool ActiveMQConnectionFactory::isUseRetroactiveConsumer() const {
+ return this->settings->useRetroactiveConsumer;
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void ActiveMQConnectionFactory::setUseRetroactiveConsumer(bool
useRetroactiveConsumer) {
+ this->settings->useRetroactiveConsumer = useRetroactiveConsumer;
+}
+
+////////////////////////////////////////////////////////////////////////////////
+bool ActiveMQConnectionFactory::isExclusiveConsumer() const {
+ return this->settings->exclusiveConsumer;
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void ActiveMQConnectionFactory::setExclusiveConsumer(bool exclusiveConsumer) {
+ this->settings->exclusiveConsumer = exclusiveConsumer;
+}
Modified:
activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQConnectionFactory.h
URL:
http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQConnectionFactory.h?rev=1463311&r1=1463310&r2=1463311&view=diff
==============================================================================
---
activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQConnectionFactory.h
(original)
+++
activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQConnectionFactory.h
Mon Apr 1 21:56:01 2013
@@ -40,12 +40,11 @@ namespace core{
class AMQCPP_API ActiveMQConnectionFactory : public cms::ConnectionFactory
{
public:
- // Default Broker URI if none specified
+ // Default Broker URI if none specified
'failover:tcp://localhost:61616'
static const std::string DEFAULT_URI;
private:
- // d-Pointer holding pre-configured factory settings
FactorySettings* settings;
private:
@@ -59,9 +58,13 @@ namespace core{
/**
* Constructor
- * @param url the URI of the Broker we are connecting to.
- * @param username to authenticate with, defaults to ""
- * @param password to authenticate with, defaults to ""
+ *
+ * @param uri
+ * The URI of the Broker we are connecting to.
+ * @param username
+ * The user name to authenticate with this connection.
+ * @param password
+ * The password to authenticate with this connection.
*/
ActiveMQConnectionFactory(const std::string& uri,
const std::string& username = "",
@@ -69,9 +72,13 @@ namespace core{
/**
* Constructor
- * @param uri the URI of the Broker we are connecting to.
- * @param username to authenticate with, defaults to ""
- * @param password to authenticate with, defaults to ""
+ *
+ * @param uri
+ * The URI of the Broker we are connecting to.
+ * @param username
+ * The user name to authenticate with this connection.
+ * @param password
+ * The password to authenticate with this connection.
*/
ActiveMQConnectionFactory(const decaf::net::URI& uri,
const std::string& username = "",
@@ -84,8 +91,10 @@ namespace core{
* connection is created in stopped mode. No messages will be
* delivered until the Connection.start method is explicitly
* called.
+ *
* @returns a Connection Pointer
- * @throws CMSException
+ *
+ * @throws CMSException if an error occurs.
*/
virtual cms::Connection* createConnection();
@@ -93,14 +102,20 @@ namespace core{
* Creates a connection with the specified user identity. The
* connection is created in stopped mode. No messages will be
* delivered until the Connection.start method is explicitly
- * called. The username and password values passed here do not
+ * called. The user name and password values passed here do not
* change the defaults, subsequent calls to the parameterless
* createConnection will continue to use the default values that
* were set in the Constructor.
- * @param username to authenticate with
- * @param password to authenticate with
+ *
+ * @param username
+ * The user name to authenticate with this connection.
+ * @param password
+ * The password to authenticate with this connection.
+ *
* @returns a Connection Pointer
- * @throws CMSException
+ *
+ * @throws CMSSecurityException if the user credentials are invalid.
+ * @throws CMSException if an error occurs.
*/
virtual cms::Connection* createConnection(const std::string& username,
const std::string& password);
@@ -113,12 +128,19 @@ namespace core{
* change the defaults, subsequent calls to the parameterless
* createConnection will continue to use the default values that
* were set in the Constructor.
- * @param username to authenticate with
- * @param password to authenticate with
- * @param clientId to assign to connection if "" then a random cleint
- * Id is created for this connection.
+ *
+ * @param username
+ * The user name to authenticate with this connection.
+ * @param password
+ * The password to authenticate with this connection.
+ * @param clientId
+ * The client Id to assign to connection if "" then a random
client
+ * Id is created for this connection.
+ *
* @returns a Connection Pointer
- * @throws CMSException
+ *
+ * @throws CMSSecurityException if the user credentials are invalid.
+ * @throws CMSException if an error occurs.
*/
virtual cms::Connection* createConnection(const std::string& username,
const std::string& password,
@@ -297,6 +319,23 @@ namespace core{
void setUseAsyncSend(bool value);
/**
+ * Returns whether Message acknowledgments are sent asynchronously
meaning no
+ * response is required from the broker before the ack completes.
+ *
+ * @return the sendAcksAsync configured value. (defaults to true)
+ */
+ bool isSendAcksAsync() const;
+
+ /**
+ * Sets whether Message acknowledgments are sent asynchronously
meaning no
+ * response is required from the broker before the ack completes.
+ *
+ * @param sendAcksAsync
+ * The sendAcksAsync configuration value to set.
+ */
+ void setSendAcksAsync(bool sendAcksAsync);
+
+ /**
* Gets if the Connection is configured for Message body compression.
* @returns if the Message body will be Compressed or not.
*/
@@ -386,6 +425,39 @@ namespace core{
void setMessagePrioritySupported(bool value);
/**
+ * Should all created consumers be retroactive.
+ *
+ * @returns true if consumer will be created with the retroactive flag
set.
+ */
+ bool isUseRetroactiveConsumer() const;
+
+ /**
+ * Sets whether or not retroactive consumers are enabled. Retroactive
+ * consumers allow non-durable topic subscribers to receive old
messages
+ * that were published before the non-durable subscriber started.
+ *
+ * @param useRetroactiveConsumer
+ * The value of this configuration option.
+ */
+ void setUseRetroactiveConsumer(bool useRetroactiveConsumer);
+
+ /**
+ * Should all created consumers be exclusive.
+ *
+ * @returns true if consumer will be created with the exclusive flag
set.
+ */
+ bool isExclusiveConsumer() const;
+
+ /**
+ * Enables or disables whether or not queue consumers should be
exclusive or
+ * not for example to preserve ordering when not using Message Groups.
+ *
+ * @param exclusiveConsumer
+ * The value of this configuration option.
+ */
+ void setExclusiveConsumer(bool exclusiveConsumer);
+
+ /**
* Is the Connection created by this factory configured to watch for
advisory messages
* that inform the Connection about temporary destination create /
destroy.
*
@@ -402,6 +474,172 @@ namespace core{
*/
void setWatchTopicAdvisories(bool value);
+ /**
+ * Get the audit depth for Messages for consumers when using a fault
+ * tolerant transport. The higher the value the more messages are
checked
+ * for duplication, and the larger the performance impact of duplicate
+ * detection will be.
+ *
+ * @returns the configured audit depth.
+ */
+ int getAuditDepth() const;
+
+ /**
+ * Set the audit depth for Messages for consumers when using a fault
+ * tolerant transport. The higher the value the more messages are
checked
+ * for duplication, and the larger the performance impact of duplicate
+ * detection will be.
+ *
+ * @param auditDepth
+ * The configured audit depth.
+ */
+ void setAuditDepth(int auditDepth);
+
+ /**
+ * The number of Producers that will be audited.
+ *
+ * @returns the configured number of producers to include in the audit.
+ */
+ int getAuditMaximumProducerNumber() const;
+
+ /**
+ * The number of Producers that will be audited.
+ *
+ * @param auditMaximumProducerNumber
+ * The configured number of producers to include in the audit.
+ */
+ void setAuditMaximumProducerNumber(int auditMaximumProducerNumber);
+
+ /**
+ * Gets the value of the configured Duplicate Message detection
feature.
+ *
+ * When enabled and a fault tolerant transport is used (think
failover) then
+ * this feature will help to detect and filter duplicate messages that
might
+ * otherwise be delivered to a consumer after a connection failure.
+ *
+ * Disabling this can increase performance since no Message auditing
will
+ * occur.
+ *
+ * @return the checkForDuplicates value currently set.
+ */
+ bool isCheckForDuplicates() const;
+
+ /**
+ * Gets the value of the configured Duplicate Message detection
feature.
+ *
+ * When enabled and a fault tolerant transport is used (think
failover) then
+ * this feature will help to detect and filter duplicate messages that
might
+ * otherwise be delivered to a consumer after a connection failure.
+ *
+ * Disabling this can increase performance since no Message auditing
will
+ * occur.
+ *
+ * @param checkForDuplicates
+ * The checkForDuplicates value to be configured.
+ */
+ void setCheckForDuplicates(bool checkForDuplicates);
+
+ /**
+ * when true, submit individual transacted acks immediately rather
than with transaction
+ * completion. This allows the acks to represent delivery status
which can be persisted on
+ * rollback Used in conjunction with KahaDB set to Rewrite On
Redelivery.
+ *
+ * @returns true if this option is enabled.
+ */
+ bool isTransactedIndividualAck() const;
+
+ /**
+ * when true, submit individual transacted acks immediately rather
than with transaction
+ * completion. This allows the acks to represent delivery status
which can be persisted on
+ * rollback Used in conjunction with KahaDB set to Rewrite On
Redelivery.
+ *
+ * @param transactedIndividualAck
+ * The value to set.
+ */
+ void setTransactedIndividualAck(bool transactedIndividualAck);
+
+ /**
+ * Returns true if non-blocking redelivery of Messages is configured
for Consumers
+ * that are rolled back or recovered.
+ *
+ * @return true if non-blocking redelivery is enabled.
+ */
+ bool isNonBlockingRedelivery() const;
+
+ /**
+ * When true a MessageConsumer will not stop Message delivery before
re-delivering Messages
+ * from a rolled back transaction. This implies that message order
will not be preserved and
+ * also will result in the TransactedIndividualAck option to be
enabled.
+ *
+ * @param nonBlockingRedelivery
+ * The value to configure for non-blocking redelivery.
+ */
+ void setNonBlockingRedelivery(bool nonBlockingRedelivery);
+
+ /**
+ * Gets the delay period for a consumer redelivery.
+ *
+ * @returns configured time delay in milliseconds.
+ */
+ long long getConsumerFailoverRedeliveryWaitPeriod() const;
+
+ /**
+ * Sets the delay period for a consumer redelivery.
+ *
+ * @param value
+ * The configured time delay in milliseconds.
+ */
+ void setConsumerFailoverRedeliveryWaitPeriod(long long value);
+
+ /**
+ * @return true if optimizeAcknowledge is enabled.
+ */
+ bool isOptimizeAcknowledge() const;
+
+ /**
+ * Sets if Consumers are configured to use Optimized Acknowledge by
default.
+ *
+ * @param optimizeAcknowledge
+ * The optimizeAcknowledge mode to set.
+ */
+ void setOptimizeAcknowledge(bool optimizeAcknowledge);
+
+ /**
+ * Gets the time between optimized ack batches in milliseconds.
+ *
+ * @returns time between optimized ack batches in Milliseconds.
+ */
+ long long getOptimizeAcknowledgeTimeOut() const;
+
+ /**
+ * The max time in milliseconds between optimized ack batches.
+ *
+ * @param optimizeAcknowledgeTimeOut
+ * The time in milliseconds for optimized ack batches.
+ */
+ void setOptimizeAcknowledgeTimeOut(long long
optimizeAcknowledgeTimeOut);
+
+ /**
+ * Gets the configured time interval that is used to force all
MessageConsumers that have
+ * optimizedAcknowledge enabled to send an ack for any outstanding
Message Acks. By default
+ * this value is set to zero meaning that the consumers will not do
any background Message
+ * acknowledgment.
+ *
+ * @return the scheduledOptimizedAckInterval
+ */
+ long long getOptimizedAckScheduledAckInterval() const;
+
+ /**
+ * Sets the amount of time between scheduled sends of any outstanding
Message Acks for
+ * consumers that have been configured with optimizeAcknowledge
enabled.
+ *
+ * Time is given in Milliseconds.
+ *
+ * @param optimizedAckScheduledAckInterval
+ * The scheduledOptimizedAckInterval to use for new Consumers.
+ */
+ void setOptimizedAckScheduledAckInterval(long long
optimizedAckScheduledAckInterval);
+
public:
/**
Modified:
activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ConnectionAudit.cpp
URL:
http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ConnectionAudit.cpp?rev=1463311&r1=1463310&r2=1463311&view=diff
==============================================================================
---
activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ConnectionAudit.cpp
(original)
+++
activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ConnectionAudit.cpp
Mon Apr 1 21:56:01 2013
@@ -19,6 +19,7 @@
#include <decaf/util/LinkedHashMap.h>
+#include <activemq/core/Dispatcher.h>
#include <activemq/core/ActiveMQMessageAudit.h>
#include <activemq/commands/ActiveMQDestination.h>
@@ -48,7 +49,7 @@ namespace core {
Mutex mutex;
LinkedHashMap<Pointer<ActiveMQDestination>,
Pointer<ActiveMQMessageAudit> > destinations;
- LinkedHashMap<Pointer<Dispatcher>, Pointer<ActiveMQMessageAudit> >
dispatchers;
+ LinkedHashMap<Dispatcher*, Pointer<ActiveMQMessageAudit> > dispatchers;
ConnectionAuditImpl() : mutex(), destinations(1000), dispatchers(1000)
{
}
@@ -79,14 +80,14 @@ ConnectionAudit::~ConnectionAudit() {
}
////////////////////////////////////////////////////////////////////////////////
-void ConnectionAudit::removeDispatcher(Pointer<Dispatcher> dispatcher) {
+void ConnectionAudit::removeDispatcher(Dispatcher* dispatcher) {
synchronized(&this->impl->mutex) {
this->impl->dispatchers.remove(dispatcher);
}
}
////////////////////////////////////////////////////////////////////////////////
-bool ConnectionAudit::isDuplicate(Pointer<Dispatcher> dispatcher,
Pointer<commands::Message> message) {
+bool ConnectionAudit::isDuplicate(Dispatcher* dispatcher,
Pointer<commands::Message> message) {
if (checkForDuplicates && message != NULL) {
Pointer<ActiveMQDestination> destination = message->getDestination();
@@ -117,7 +118,7 @@ bool ConnectionAudit::isDuplicate(Pointe
}
////////////////////////////////////////////////////////////////////////////////
-void ConnectionAudit::rollbackDuplicate(Pointer<Dispatcher> dispatcher,
Pointer<commands::Message> message) {
+void ConnectionAudit::rollbackDuplicate(Dispatcher* dispatcher,
Pointer<commands::Message> message) {
if (checkForDuplicates && message != NULL) {
Pointer<ActiveMQDestination> destination = message->getDestination();
if (destination != NULL) {
Modified:
activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ConnectionAudit.h
URL:
http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ConnectionAudit.h?rev=1463311&r1=1463310&r2=1463311&view=diff
==============================================================================
---
activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ConnectionAudit.h
(original)
+++
activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ConnectionAudit.h
Mon Apr 1 21:56:01 2013
@@ -21,12 +21,13 @@
#include <activemq/util/Config.h>
#include <activemq/commands/Message.h>
-#include <activemq/core/Dispatcher.h>
+#include <decaf/lang/Pointer.h>
namespace activemq {
namespace core {
class ConnectionAuditImpl;
+ class Dispatcher;
/**
* Provides the Auditing functionality used by Connections to attempt to
@@ -58,11 +59,11 @@ namespace core {
public:
- void removeDispatcher(Pointer<Dispatcher> dispatcher);
+ void removeDispatcher(Dispatcher* dispatcher);
- bool isDuplicate(Pointer<Dispatcher> dispatcher,
Pointer<commands::Message> message);
+ bool isDuplicate(Dispatcher* dispatcher,
decaf::lang::Pointer<commands::Message> message);
- void rollbackDuplicate(Pointer<Dispatcher> dispatcher,
Pointer<commands::Message> message);
+ void rollbackDuplicate(Dispatcher* dispatcher,
decaf::lang::Pointer<commands::Message> message);
public:
Modified:
activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/Dispatcher.h
URL:
http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/Dispatcher.h?rev=1463311&r1=1463310&r2=1463311&view=diff
==============================================================================
---
activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/Dispatcher.h
(original)
+++
activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/Dispatcher.h
Mon Apr 1 21:56:01 2013
@@ -25,8 +25,6 @@
namespace activemq {
namespace core {
- using decaf::lang::Pointer;
-
/**
* Interface for an object responsible for dispatching messages to
* consumers.
@@ -42,7 +40,7 @@ namespace core {
* @param message
* The message to be dispatched to a waiting consumer.
*/
- virtual void dispatch(const Pointer<commands::MessageDispatch>&
message) = 0;
+ virtual void dispatch(const
decaf::lang::Pointer<commands::MessageDispatch>& message) = 0;
/**
* HashCode method allowing Dispatcher instances to be used in HashMap
etc.