Author: gtully
Date: Wed Apr 21 16:45:30 2010
New Revision: 936390
URL: http://svn.apache.org/viewvc?rev=936390&view=rev
Log:
resolve https://issues.apache.org/activemq/browse/AMQ-2704 - add message audit
to topic sub so that a regular sub can behave like store backed subscriptions
which already suppress duplicates. Dup ocurrs from ring network topology where
there are two equal and valid routes for a message, see test case
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/AbstractStoreCursor.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/PolicyEntry.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/ThreeBrokerTopicNetworkTest.java
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java
URL:
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java?rev=936390&r1=936389&r2=936390&view=diff
==============================================================================
---
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java
(original)
+++
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java
Wed Apr 21 16:45:30 2010
@@ -22,6 +22,7 @@ import java.util.concurrent.atomic.Atomi
import javax.jms.JMSException;
+import org.apache.activemq.ActiveMQMessageAudit;
import org.apache.activemq.broker.Broker;
import org.apache.activemq.broker.ConnectionContext;
import org.apache.activemq.broker.region.cursors.FilePendingMessageCursor;
@@ -62,6 +63,12 @@ public class TopicSubscription extends A
private final AtomicLong dequeueCounter = new AtomicLong(0);
private int memoryUsageHighWaterMark = 95;
private boolean slowConsumer;
+
+ // allow duplicate suppression in a ring network of brokers
+ protected int maxProducersToAudit = 1024;
+ protected int maxAuditDepth = 1000;
+ protected boolean enableAudit = false;
+ protected ActiveMQMessageAudit audit;
public TopicSubscription(Broker broker,ConnectionContext context,
ConsumerInfo info, SystemUsage usageManager) throws Exception {
super(broker, context, info);
@@ -78,9 +85,15 @@ public class TopicSubscription extends A
this.matched.setSystemUsage(usageManager);
this.matched.setMemoryUsageHighWaterMark(getCursorMemoryHighWaterMark());
this.matched.start();
+ if (enableAudit) {
+ audit= new ActiveMQMessageAudit(maxAuditDepth,
maxProducersToAudit);
+ }
}
public void add(MessageReference node) throws Exception {
+ if (isDuplicate(node)) {
+ return;
+ }
enqueueCounter.incrementAndGet();
if (!isFull() && matched.isEmpty() && !isSlave()) {
// if maximumPendingMessages is set we will only discard messages
which
@@ -158,6 +171,19 @@ public class TopicSubscription extends A
}
}
+ private boolean isDuplicate(MessageReference node) {
+ boolean duplicate = false;
+ if (enableAudit && audit != null) {
+ duplicate = audit.isDuplicate(node);
+ if (LOG.isDebugEnabled()) {
+ if (duplicate) {
+ LOG.debug("ignoring duplicate add: " +
node.getMessageId());
+ }
+ }
+ }
+ return duplicate;
+ }
+
/**
* Discard any expired messages from the matched list. Called from a
* synchronized block.
@@ -313,6 +339,39 @@ public class TopicSubscription extends A
this.messageEvictionStrategy = messageEvictionStrategy;
}
+ public int getMaxProducersToAudit() {
+ return maxProducersToAudit;
+ }
+
+ public synchronized void setMaxProducersToAudit(int maxProducersToAudit) {
+ this.maxProducersToAudit = maxProducersToAudit;
+ if (audit != null) {
+ audit.setMaximumNumberOfProducersToTrack(maxProducersToAudit);
+ }
+ }
+
+ public int getMaxAuditDepth() {
+ return maxAuditDepth;
+ }
+
+ public synchronized void setMaxAuditDepth(int maxAuditDepth) {
+ this.maxAuditDepth = maxAuditDepth;
+ if (audit != null) {
+ audit.setAuditDepth(maxAuditDepth);
+ }
+ }
+
+ public boolean isEnableAudit() {
+ return enableAudit;
+ }
+
+ public synchronized void setEnableAudit(boolean enableAudit) {
+ this.enableAudit = enableAudit;
+ if (enableAudit && audit==null) {
+ audit = new
ActiveMQMessageAudit(maxAuditDepth,maxProducersToAudit);
+ }
+ }
+
// Implementation methods
//
-------------------------------------------------------------------------
public boolean isFull() {
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/AbstractStoreCursor.java
URL:
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/AbstractStoreCursor.java?rev=936390&r1=936389&r2=936390&view=diff
==============================================================================
---
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/AbstractStoreCursor.java
(original)
+++
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/AbstractStoreCursor.java
Wed Apr 21 16:45:30 2010
@@ -91,7 +91,9 @@ public abstract class AbstractStoreCurso
* the cache. If subsequently, we pull out that message from the
store (before its deleted)
* it will be a duplicate - but should be ignored
*/
-
//LOG.error(regionDestination.getActiveMQDestination().getPhysicalName() + "
cursor got duplicate: " + message);
+ if (LOG.isDebugEnabled()) {
+
LOG.debug(regionDestination.getActiveMQDestination().getPhysicalName() + "
cursor got duplicate: " + message);
+ }
storeHasMessages = true;
}
return recovered;
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/PolicyEntry.java
URL:
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/PolicyEntry.java?rev=936390&r1=936389&r2=936390&view=diff
==============================================================================
---
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/PolicyEntry.java
(original)
+++
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/PolicyEntry.java
Wed Apr 21 16:45:30 2010
@@ -178,6 +178,11 @@ public class PolicyEntry extends Destina
int maxBatchSize =
subscription.getConsumerInfo().getPrefetchSize();
subscription.setMatched(pendingSubscriberPolicy.getSubscriberPendingMessageCursor(broker,name,
maxBatchSize));
}
+ if (enableAudit) {
+ subscription.setEnableAudit(enableAudit);
+ subscription.setMaxProducersToAudit(maxProducersToAudit);
+ subscription.setMaxAuditDepth(maxAuditDepth);
+ }
}
public void configure(Broker broker, SystemUsage memoryManager,
DurableTopicSubscription sub) {
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java
URL:
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java?rev=936390&r1=936389&r2=936390&view=diff
==============================================================================
---
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java
(original)
+++
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java
Wed Apr 21 16:45:30 2010
@@ -19,6 +19,7 @@ package org.apache.activemq.network;
import java.io.IOException;
import java.security.GeneralSecurityException;
import java.security.cert.X509Certificate;
+import java.util.Arrays;
import java.util.Collection;
import java.util.List;
import java.util.Properties;
@@ -680,9 +681,9 @@ public abstract class DemandForwardingBr
final DemandSubscription sub =
subscriptionMapByLocalId.get(md.getConsumerId());
if (sub != null && md.getMessage() != null &&
sub.incrementOutstandingResponses()) {
- if (originallyCameFromRemote(md, sub)) {
+ if (suppressMessageDispatch(md, sub)) {
if (LOG.isDebugEnabled()) {
- LOG.debug(configuration.getBrokerName() + "
message not forwarded to " + remoteBrokerName + " because message came from
there or fails networkTTL: " + md.getMessage());
+ LOG.debug(configuration.getBrokerName() + "
message not forwarded to " + remoteBrokerName + " because message came from
there or fails networkTTL, brokerPath: " +
Arrays.toString(md.getMessage().getBrokerPath()) + ", message: " +
md.getMessage());
}
// still ack as it may be durable
try {
@@ -695,7 +696,7 @@ public abstract class DemandForwardingBr
Message message = configureMessage(md);
if (LOG.isDebugEnabled()) {
- LOG.debug("bridging " +
configuration.getBrokerName() + " -> " + remoteBrokerName + ": " + message);
+ LOG.debug("bridging " +
configuration.getBrokerName() + " -> " + remoteBrokerName + ", brokerPath: " +
Arrays.toString(message.getBrokerPath()) + ", message: " + message);
}
if (!message.isResponseRequired()) {
@@ -776,25 +777,25 @@ public abstract class DemandForwardingBr
}
}
- private boolean originallyCameFromRemote(MessageDispatch md,
DemandSubscription sub) throws Exception {
+ private boolean suppressMessageDispatch(MessageDispatch md,
DemandSubscription sub) throws Exception {
// See if this consumer's brokerPath tells us it came from the broker
at the other end
// of the bridge. I think we should be making this decision based on
the message's
// broker bread crumbs and not the consumer's? However, the message's
broker bread
// crumbs are null, which is another matter.
- boolean cameFromRemote = false;
+ boolean suppress = false;
Object consumerInfo = md.getMessage().getDataStructure();
if (consumerInfo != null && (consumerInfo instanceof ConsumerInfo)) {
- cameFromRemote = contains(((ConsumerInfo)
consumerInfo).getBrokerPath(), remoteBrokerInfo.getBrokerId());
+ suppress = contains(((ConsumerInfo) consumerInfo).getBrokerPath(),
remoteBrokerInfo.getBrokerId());
}
// for durable subs, suppression via filter leaves dangling acks so we
need to
// check here and allow the ack irrespective
- if (!cameFromRemote && sub.getLocalInfo().isDurable()) {
+ if (!suppress && sub.getLocalInfo().isDurable()) {
MessageEvaluationContext messageEvalContext = new
MessageEvaluationContext();
messageEvalContext.setMessageReference(md.getMessage());
- cameFromRemote =
!createNetworkBridgeFilter(null).matches(messageEvalContext);
+ suppress =
!createNetworkBridgeFilter(null).matches(messageEvalContext);
}
- return cameFromRemote;
+ return suppress;
}
/**
@@ -1154,7 +1155,7 @@ public abstract class DemandForwardingBr
sub.getLocalInfo().setAdditionalPredicate(createNetworkBridgeFilter(info));
} else {
// need to ack this message if it is ignored as it is durable so
- // we check before we send. see: originallyCameFromRemote()
+ // we check before we send. see: suppressMessageDispatch()
}
}
Modified:
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/ThreeBrokerTopicNetworkTest.java
URL:
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/ThreeBrokerTopicNetworkTest.java?rev=936390&r1=936389&r2=936390&view=diff
==============================================================================
---
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/ThreeBrokerTopicNetworkTest.java
(original)
+++
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/ThreeBrokerTopicNetworkTest.java
Wed Apr 21 16:45:30 2010
@@ -17,15 +17,21 @@
package org.apache.activemq.usecases;
import java.net.URI;
+import java.util.Collection;
import java.util.HashMap;
+import java.util.Iterator;
import javax.jms.Destination;
import javax.jms.MessageConsumer;
+import javax.jms.Topic;
import junit.framework.Test;
import org.apache.activemq.JmsMultipleBrokersTestSupport;
-import org.apache.activemq.transport.failover.FailoverUriTest;
+import org.apache.activemq.JmsMultipleBrokersTestSupport.BrokerItem;
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.broker.region.policy.PolicyEntry;
+import org.apache.activemq.broker.region.policy.PolicyMap;
import org.apache.activemq.util.MessageIdList;
/**
@@ -242,6 +248,106 @@ public class ThreeBrokerTopicNetworkTest
assertEquals(MESSAGE_COUNT * 3, msgsC.getMessageCount());
}
+ public void testAllConnectedBrokerNetworkSingleProducerTTL() throws
Exception {
+
+ // duplicates are expected with ttl of 2 as each broker is connected
to the next
+ // but the dups are suppressed by the store and now also by the topic
sub when enableAudit
+ // default (true) is present in a matching destination policy entry
+ int networkTTL = 2;
+ boolean conduitSubs = true;
+ // Setup broker networks
+ bridgeBrokers("BrokerA", "BrokerB", dynamicOnly, networkTTL,
conduitSubs);
+ bridgeBrokers("BrokerB", "BrokerA", dynamicOnly, networkTTL,
conduitSubs);
+ bridgeBrokers("BrokerB", "BrokerC", dynamicOnly, networkTTL,
conduitSubs);
+ bridgeBrokers("BrokerC", "BrokerB", dynamicOnly, networkTTL,
conduitSubs);
+ bridgeBrokers("BrokerA", "BrokerC", dynamicOnly, networkTTL,
conduitSubs);
+ bridgeBrokers("BrokerC", "BrokerA", dynamicOnly, networkTTL,
conduitSubs);
+
+ PolicyMap policyMap = new PolicyMap();
+ // enable audit is on by default just need to give it matching policy
entry
+ // so it will be applied to the topic subscription
+ policyMap.setDefaultEntry(new PolicyEntry());
+ Collection<BrokerItem> brokerList = brokers.values();
+ for (Iterator<BrokerItem> i = brokerList.iterator(); i.hasNext();) {
+ BrokerService broker = i.next().broker;
+ broker.setDestinationPolicy(policyMap);
+ broker.setDeleteAllMessagesOnStartup(true);
+ }
+ startAllBrokers();
+
+ // Setup destination
+ Destination dest = createDestination("TEST.FOO", true);
+
+ // Setup consumers
+ MessageConsumer clientA = createConsumer("BrokerA", dest);
+ MessageConsumer clientB = createConsumer("BrokerB", dest);
+ MessageConsumer clientC = createConsumer("BrokerC", dest);
+ //let consumers propogate around the network
+ Thread.sleep(2000);
+
+ // Send messages
+ sendMessages("BrokerA", dest, 1);
+
+ // Get message count
+ MessageIdList msgsA = getConsumerMessages("BrokerA", clientA);
+ MessageIdList msgsB = getConsumerMessages("BrokerB", clientB);
+ MessageIdList msgsC = getConsumerMessages("BrokerC", clientC);
+
+ msgsA.waitForMessagesToArrive(1);
+ msgsB.waitForMessagesToArrive(1);
+ msgsC.waitForMessagesToArrive(1);
+
+ // ensure we don't get any more messages
+ Thread.sleep(2000);
+
+ assertEquals(1, msgsA.getMessageCount());
+ assertEquals(1, msgsB.getMessageCount());
+ assertEquals(1, msgsC.getMessageCount());
+ }
+
+ public void testAllConnectedBrokerNetworkDurableSubTTL() throws Exception {
+ int networkTTL = 2;
+ boolean conduitSubs = true;
+ // Setup broker networks
+ bridgeBrokers("BrokerA", "BrokerB", dynamicOnly, networkTTL,
conduitSubs);
+ bridgeBrokers("BrokerB", "BrokerA", dynamicOnly, networkTTL,
conduitSubs);
+ bridgeBrokers("BrokerB", "BrokerC", dynamicOnly, networkTTL,
conduitSubs);
+ bridgeBrokers("BrokerC", "BrokerB", dynamicOnly, networkTTL,
conduitSubs);
+ bridgeBrokers("BrokerA", "BrokerC", dynamicOnly, networkTTL,
conduitSubs);
+ bridgeBrokers("BrokerC", "BrokerA", dynamicOnly, networkTTL,
conduitSubs);
+
+ startAllBrokers();
+
+ // Setup destination
+ Destination dest = createDestination("TEST.FOO", true);
+
+ // Setup consumers
+ MessageConsumer clientA = createDurableSubscriber("BrokerA",
(Topic)dest, "clientA");
+ MessageConsumer clientB = createDurableSubscriber("BrokerB",
(Topic)dest, "clientB");
+ MessageConsumer clientC = createDurableSubscriber("BrokerC",
(Topic)dest, "clientC");
+ //let consumers propogate around the network
+ Thread.sleep(2000);
+
+ // Send messages
+ sendMessages("BrokerA", dest, 1);
+
+ // Get message count
+ MessageIdList msgsA = getConsumerMessages("BrokerA", clientA);
+ MessageIdList msgsB = getConsumerMessages("BrokerB", clientB);
+ MessageIdList msgsC = getConsumerMessages("BrokerC", clientC);
+
+ msgsA.waitForMessagesToArrive(1);
+ msgsB.waitForMessagesToArrive(1);
+ msgsC.waitForMessagesToArrive(1);
+
+ // ensure we don't get any more messages
+ Thread.sleep(2000);
+
+ assertEquals(1, msgsA.getMessageCount());
+ assertEquals(1, msgsB.getMessageCount());
+ assertEquals(1, msgsC.getMessageCount());
+ }
+
/**
* BrokerA <-> BrokerB <-> BrokerC
*/
@@ -284,9 +390,10 @@ public class ThreeBrokerTopicNetworkTest
public void setUp() throws Exception {
super.setAutoFail(true);
super.setUp();
- createBroker(new
URI("broker:(tcp://localhost:61616)/BrokerA?persistent=false&useJmx=false"));
- createBroker(new
URI("broker:(tcp://localhost:61617)/BrokerB?persistent=false&useJmx=false"));
- createBroker(new
URI("broker:(tcp://localhost:61618)/BrokerC?persistent=false&useJmx=false"));
+ String options = new String("?persistent=false&useJmx=false");
+ createBroker(new URI("broker:(tcp://localhost:61616)/BrokerA" +
options));
+ createBroker(new URI("broker:(tcp://localhost:61617)/BrokerB" +
options));
+ createBroker(new URI("broker:(tcp://localhost:61618)/BrokerC" +
options));
}
public static Test suite() {