Author: dejanb
Date: Wed Oct 19 11:13:19 2011
New Revision: 1186095
URL: http://svn.apache.org/viewvc?rev=1186095&view=rev
Log:
https://issues.apache.org/jira/browse/AMQ-3550 - local option for vritual topics
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/virtual/SelectorAwareVirtualTopicInterceptor.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/virtual/VirtualTopic.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/virtual/VirtualTopicInterceptor.java
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/TwoBrokerVirtualDestDinamicallyIncludedDestTest.java
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/virtual/SelectorAwareVirtualTopicInterceptor.java
URL:
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/virtual/SelectorAwareVirtualTopicInterceptor.java?rev=1186095&r1=1186094&r2=1186095&view=diff
==============================================================================
---
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/virtual/SelectorAwareVirtualTopicInterceptor.java
(original)
+++
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/virtual/SelectorAwareVirtualTopicInterceptor.java
Wed Oct 19 11:13:19 2011
@@ -16,10 +16,6 @@
*/
package org.apache.activemq.broker.region.virtual;
-import java.io.IOException;
-import java.util.List;
-import java.util.Set;
-
import org.apache.activemq.broker.Broker;
import org.apache.activemq.broker.ProducerBrokerExchange;
import org.apache.activemq.broker.region.Destination;
@@ -29,10 +25,14 @@ import org.apache.activemq.command.Messa
import org.apache.activemq.filter.MessageEvaluationContext;
import org.apache.activemq.filter.NonCachedMessageEvaluationContext;
+import java.io.IOException;
+import java.util.List;
+import java.util.Set;
+
public class SelectorAwareVirtualTopicInterceptor extends
VirtualTopicInterceptor {
- public SelectorAwareVirtualTopicInterceptor(Destination next, String
prefix, String postfix) {
- super(next, prefix, postfix);
+ public SelectorAwareVirtualTopicInterceptor(Destination next, String
prefix, String postfix, boolean local) {
+ super(next, prefix, postfix, local);
}
/**
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/virtual/VirtualTopic.java
URL:
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/virtual/VirtualTopic.java?rev=1186095&r1=1186094&r2=1186095&view=diff
==============================================================================
---
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/virtual/VirtualTopic.java
(original)
+++
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/virtual/VirtualTopic.java
Wed Oct 19 11:13:19 2011
@@ -40,6 +40,7 @@ public class VirtualTopic implements Vir
private String postfix = "";
private String name = ">";
private boolean selectorAware = false;
+ private boolean local = false;
public ActiveMQDestination getVirtualDestination() {
@@ -47,8 +48,8 @@ public class VirtualTopic implements Vir
}
public Destination intercept(Destination destination) {
- return selectorAware ? new
SelectorAwareVirtualTopicInterceptor(destination, getPrefix(), getPostfix()) :
- new VirtualTopicInterceptor(destination, getPrefix(),
getPostfix());
+ return selectorAware ? new
SelectorAwareVirtualTopicInterceptor(destination, getPrefix(), getPostfix(),
isLocal()) :
+ new VirtualTopicInterceptor(destination, getPrefix(),
getPostfix(), isLocal());
}
@@ -111,4 +112,12 @@ public class VirtualTopic implements Vir
public boolean isSelectorAware() {
return selectorAware;
}
+
+ public boolean isLocal() {
+ return local;
+ }
+
+ public void setLocal(boolean local) {
+ this.local = local;
+ }
}
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/virtual/VirtualTopicInterceptor.java
URL:
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/virtual/VirtualTopicInterceptor.java?rev=1186095&r1=1186094&r2=1186095&view=diff
==============================================================================
---
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/virtual/VirtualTopicInterceptor.java
(original)
+++
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/virtual/VirtualTopicInterceptor.java
Wed Oct 19 11:13:19 2011
@@ -33,15 +33,17 @@ public class VirtualTopicInterceptor ext
private String prefix;
private String postfix;
+ private boolean local;
- public VirtualTopicInterceptor(Destination next, String prefix, String
postfix) {
+ public VirtualTopicInterceptor(Destination next, String prefix, String
postfix, boolean local) {
super(next);
this.prefix = prefix;
this.postfix = postfix;
+ this.local = local;
}
public void send(ProducerBrokerExchange context, Message message) throws
Exception {
- if (!message.isAdvisory()) {
+ if (!message.isAdvisory() && !(local && message.getBrokerPath() !=
null)) {
ActiveMQDestination queueConsumers =
getQueueConsumersWildcard(message.getDestination());
send(context, message, queueConsumers);
}
Modified:
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/TwoBrokerVirtualDestDinamicallyIncludedDestTest.java
URL:
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/TwoBrokerVirtualDestDinamicallyIncludedDestTest.java?rev=1186095&r1=1186094&r2=1186095&view=diff
==============================================================================
---
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/TwoBrokerVirtualDestDinamicallyIncludedDestTest.java
(original)
+++
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/TwoBrokerVirtualDestDinamicallyIncludedDestTest.java
Wed Oct 19 11:13:19 2011
@@ -189,19 +189,19 @@ public class TwoBrokerVirtualDestDinamic
nc1.setDecreaseNetworkConsumerPriority(decreaseNetworkConsumerPriority);
nc1.setSuppressDuplicateQueueSubscriptions(suppressDuplicateQueueSubscriptions);
nc1.addStaticallyIncludedDestination(ActiveMQDestination.createDestination("global.>",
ActiveMQDestination.TOPIC_TYPE));
-
nc1.addExcludedDestination(ActiveMQDestination.createDestination("Consumer.*.global.>",
ActiveMQDestination.QUEUE_TYPE));
+
//nc1.addExcludedDestination(ActiveMQDestination.createDestination("Consumer.*.global.>",
ActiveMQDestination.QUEUE_TYPE));
nc1.addDynamicallyIncludedDestination(ActiveMQDestination.createDestination("global.>",
ActiveMQDestination.QUEUE_TYPE));
nc1.addDynamicallyIncludedDestination(ActiveMQDestination.createDestination("global.>",
ActiveMQDestination.TOPIC_TYPE));
-
//nc1.addDynamicallyIncludedDestination(ActiveMQDestination.createDestination("Consumer.*.global.>",
ActiveMQDestination.QUEUE_TYPE));
+
nc1.addDynamicallyIncludedDestination(ActiveMQDestination.createDestination("Consumer.*.global.>",
ActiveMQDestination.QUEUE_TYPE));
NetworkConnector nc2 = bridgeBrokers("BrokerB", "BrokerA",
dynamicOnly, networkTTL, conduit);
nc2.setDecreaseNetworkConsumerPriority(decreaseNetworkConsumerPriority);
nc2.setSuppressDuplicateQueueSubscriptions(suppressDuplicateQueueSubscriptions);
nc2.addStaticallyIncludedDestination(ActiveMQDestination.createDestination("global.>",
ActiveMQDestination.TOPIC_TYPE));
-
nc2.addExcludedDestination(ActiveMQDestination.createDestination("Consumer.*.global.>",
ActiveMQDestination.QUEUE_TYPE));
+
//nc2.addExcludedDestination(ActiveMQDestination.createDestination("Consumer.*.global.>",
ActiveMQDestination.QUEUE_TYPE));
nc2.addDynamicallyIncludedDestination(ActiveMQDestination.createDestination("global.>",
ActiveMQDestination.QUEUE_TYPE));
nc2.addDynamicallyIncludedDestination(ActiveMQDestination.createDestination("global.>",
ActiveMQDestination.TOPIC_TYPE));
-
//nc2.addDynamicallyIncludedDestination(ActiveMQDestination.createDestination("Consumer.*.global.>",
ActiveMQDestination.QUEUE_TYPE));
+
nc2.addDynamicallyIncludedDestination(ActiveMQDestination.createDestination("Consumer.*.global.>",
ActiveMQDestination.QUEUE_TYPE));
}
private BrokerService createAndConfigureBroker(URI uri) throws Exception {
@@ -211,7 +211,9 @@ public class TwoBrokerVirtualDestDinamic
// make all topics virtual and consumers use the default prefix
VirtualDestinationInterceptor virtualDestinationInterceptor = new
VirtualDestinationInterceptor();
- virtualDestinationInterceptor.setVirtualDestinations(new
VirtualDestination[]{new VirtualTopic()});
+ VirtualTopic vTopic = new VirtualTopic();
+ vTopic.setLocal(true);
+ virtualDestinationInterceptor.setVirtualDestinations(new
VirtualDestination[]{vTopic});
DestinationInterceptor[] destinationInterceptors = new
DestinationInterceptor[]{virtualDestinationInterceptor};
broker.setDestinationInterceptors(destinationInterceptors);
return broker;