https://issues.apache.org/jira/browse/AMQ-6027
Adding support for consumers on virtual destinations to create network demand. This behavior is turned off by default but can be enabled. For example, if a consumer comes online for a queue that is part of a VirtualTopic, this will cause a network of brokers to forward messages because a demand subscription will be created. Same for if a consumer comes online for a forwarded destination from a composite destination. There is also an option to enable flow based on the existence of a virtual destination if the virtual destination is forwarding to a Queue. Full configuration instructions for this feature will be on the wiki page. Project: http://git-wip-us.apache.org/repos/asf/activemq/repo Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/cc81680e Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/cc81680e Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/cc81680e Branch: refs/heads/master Commit: cc81680e10e5c7140ec3e28091df23e9d3c3233b Parents: 480b3e7 Author: Christopher L. Shannon (cshannon) <[email protected]> Authored: Tue Oct 20 18:15:30 2015 +0000 Committer: Christopher L. Shannon (cshannon) <[email protected]> Committed: Mon Nov 9 20:07:43 2015 +0000 ---------------------------------------------------------------------- .../activemq/advisory/AdvisoryBroker.java | 267 ++++ ...tinationFilterVirtualDestinationMatcher.java | 53 + .../advisory/VirtualDestinationMatcher.java | 29 + .../java/org/apache/activemq/broker/Broker.java | 5 + .../apache/activemq/broker/BrokerFilter.java | 13 + .../apache/activemq/broker/BrokerService.java | 35 + .../org/apache/activemq/broker/EmptyBroker.java | 11 + .../org/apache/activemq/broker/ErrorBroker.java | 13 + .../activemq/broker/MutableBrokerFilter.java | 13 + .../region/virtual/CompositeDestination.java | 41 + .../broker/region/virtual/CompositeQueue.java | 5 + .../broker/region/virtual/CompositeTopic.java | 5 + .../broker/region/virtual/VirtualTopic.java | 49 + .../network/DemandForwardingBridgeSupport.java | 3 +- .../network/NetworkBridgeConfiguration.java | 28 +- .../activemq/advisory/AdvisorySupport.java | 31 + .../activemq/command/NetworkBridgeFilter.java | 8 +- .../plugin/UpdateVirtualDestinationsTask.java | 48 + activemq-unit-tests/pom.xml | 4 + .../network/VirtualConsumerDemandTest.java | 1418 ++++++++++++++++++ 20 files changed, 2076 insertions(+), 3 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/activemq/blob/cc81680e/activemq-broker/src/main/java/org/apache/activemq/advisory/AdvisoryBroker.java ---------------------------------------------------------------------- diff --git a/activemq-broker/src/main/java/org/apache/activemq/advisory/AdvisoryBroker.java b/activemq-broker/src/main/java/org/apache/activemq/advisory/AdvisoryBroker.java index 36f5f0b..bc5f105 100755 --- a/activemq-broker/src/main/java/org/apache/activemq/advisory/AdvisoryBroker.java +++ b/activemq-broker/src/main/java/org/apache/activemq/advisory/AdvisoryBroker.java @@ -18,6 +18,7 @@ package org.apache.activemq.advisory; import java.util.ArrayList; import java.util.Collection; +import java.util.Collections; import java.util.Iterator; import java.util.LinkedHashMap; import java.util.Map; @@ -40,6 +41,7 @@ import org.apache.activemq.broker.region.RegionBroker; import org.apache.activemq.broker.region.Subscription; import org.apache.activemq.broker.region.TopicRegion; import org.apache.activemq.broker.region.TopicSubscription; +import org.apache.activemq.broker.region.virtual.VirtualDestination; import org.apache.activemq.command.ActiveMQDestination; import org.apache.activemq.command.ActiveMQMessage; import org.apache.activemq.command.ActiveMQTopic; @@ -55,6 +57,7 @@ import org.apache.activemq.command.MessageId; import org.apache.activemq.command.ProducerId; import org.apache.activemq.command.ProducerInfo; import org.apache.activemq.command.RemoveSubscriptionInfo; +import org.apache.activemq.command.SessionId; import org.apache.activemq.security.SecurityContext; import org.apache.activemq.state.ProducerState; import org.apache.activemq.usage.Usage; @@ -78,6 +81,22 @@ public class AdvisoryBroker extends BrokerFilter { private final ReentrantReadWriteLock consumersLock = new ReentrantReadWriteLock(); protected final Map<ConsumerId, ConsumerInfo> consumers = new LinkedHashMap<ConsumerId, ConsumerInfo>(); + /** + * This is a set to track all of the virtual destinations that have been added to the broker so + * they can be easily referenced later. + */ + protected final Set<VirtualDestination> virtualDestinations = Collections.newSetFromMap(new ConcurrentHashMap<VirtualDestination, Boolean>()); + /** + * This is a map to track all consumers that exist on the virtual destination so that we can fire + * an advisory later when they go away to remove the demand. + */ + protected final ConcurrentMap<ConsumerInfo, VirtualDestination> virtualDestinationConsumers = new ConcurrentHashMap<>(); + /** + * This is a map to track unique demand for the existence of a virtual destination so we make sure + * we don't send duplicate advisories. + */ + protected final ConcurrentMap<VirtualConsumerPair, ConsumerInfo> brokerConsumerDests = new ConcurrentHashMap<>(); + protected final ConcurrentMap<ProducerId, ProducerInfo> producers = new ConcurrentHashMap<ProducerId, ProducerInfo>(); protected final ConcurrentMap<ActiveMQDestination, DestinationInfo> destinations = new ConcurrentHashMap<ActiveMQDestination, DestinationInfo>(); protected final ConcurrentMap<BrokerInfo, ActiveMQMessage> networkBridges = new ConcurrentHashMap<BrokerInfo, ActiveMQMessage>(); @@ -85,6 +104,8 @@ public class AdvisoryBroker extends BrokerFilter { private final LongSequenceGenerator messageIdGenerator = new LongSequenceGenerator(); + private VirtualDestinationMatcher virtualDestinationMatcher = new DestinationFilterVirtualDestinationMatcher(); + public AdvisoryBroker(Broker next) { super(next); advisoryProducerId.setConnectionId(ID_GENERATOR.generateId()); @@ -112,6 +133,15 @@ public class AdvisoryBroker extends BrokerFilter { consumersLock.writeLock().lock(); try { consumers.put(info.getConsumerId(), info); + + //check if this is a consumer on a destination that matches a virtual destination + if (getBrokerService().isUseVirtualDestSubs()) { + for (VirtualDestination virtualDestination : virtualDestinations) { + if (virtualDestinationMatcher.matches(virtualDestination, info.getDestination())) { + fireVirtualDestinationAddAdvisory(context, info, info.getDestination(), virtualDestination); + } + } + } } finally { consumersLock.writeLock().unlock(); } @@ -171,6 +201,15 @@ public class AdvisoryBroker extends BrokerFilter { } } + // Replay the virtual destination consumers. + if (AdvisorySupport.isVirtualDestinationConsumerAdvisoryTopic(info.getDestination())) { + for (Iterator<ConsumerInfo> iter = virtualDestinationConsumers.keySet().iterator(); iter.hasNext(); ) { + ConsumerInfo key = iter.next(); + ActiveMQTopic topic = AdvisorySupport.getVirtualDestinationConsumerAdvisoryTopic(key.getDestination()); + fireConsumerAdvisory(context, key.getDestination(), topic, key); + } + } + // Replay network bridges if (AdvisorySupport.isNetworkBridgeAdvisoryTopic(info.getDestination())) { for (Iterator<BrokerInfo> iter = networkBridges.keySet().iterator(); iter.hasNext(); ) { @@ -199,6 +238,16 @@ public class AdvisoryBroker extends BrokerFilter { public Destination addDestination(ConnectionContext context, ActiveMQDestination destination, boolean create) throws Exception { Destination answer = super.addDestination(context, destination, create); if (!AdvisorySupport.isAdvisoryTopic(destination)) { + //for queues, create demand if isUseVirtualDestSubsOnCreation is true + if (getBrokerService().isUseVirtualDestSubsOnCreation() && destination.isQueue()) { + //check if this new destination matches a virtual destination that exists + for (VirtualDestination virtualDestination : virtualDestinations) { + if (virtualDestinationMatcher.matches(virtualDestination, destination)) { + fireVirtualDestinationAddAdvisory(context, null, destination, virtualDestination); + } + } + } + DestinationInfo info = new DestinationInfo(context.getConnectionId(), DestinationInfo.ADD_OPERATION_TYPE, destination); DestinationInfo previous = destinations.putIfAbsent(destination, info); if (previous == null) { @@ -228,6 +277,28 @@ public class AdvisoryBroker extends BrokerFilter { super.removeDestination(context, destination, timeout); DestinationInfo info = destinations.remove(destination); if (info != null) { + + //on destination removal, remove all demand if using virtual dest subs + if (getBrokerService().isUseVirtualDestSubs()) { + for (ConsumerInfo consumerInfo : virtualDestinationConsumers.keySet()) { + //find all consumers for this virtual destination + VirtualDestination virtualDestination = virtualDestinationConsumers.get(consumerInfo); + + //find a consumer that matches this virtualDest and destination + if (virtualDestinationMatcher.matches(virtualDestination, destination)) { + //in case of multiple matches + VirtualConsumerPair key = new VirtualConsumerPair(virtualDestination, destination); + ConsumerInfo i = brokerConsumerDests.get(key); + if (consumerInfo.equals(i)) { + if (brokerConsumerDests.remove(key) != null) { + fireVirtualDestinationRemoveAdvisory(context, consumerInfo); + break; + } + } + } + } + } + // ensure we don't modify (and loose/overwrite) an in-flight add advisory, so duplicate info = info.copy(); info.setDestination(destination); @@ -285,6 +356,11 @@ public class AdvisoryBroker extends BrokerFilter { consumersLock.writeLock().lock(); try { consumers.remove(info.getConsumerId()); + + //remove the demand for this consumer if it matches a virtual destination + if(getBrokerService().isUseVirtualDestSubs()) { + fireVirtualDestinationRemoveAdvisory(context, info); + } } finally { consumersLock.writeLock().unlock(); } @@ -467,6 +543,140 @@ public class AdvisoryBroker extends BrokerFilter { } } + private final IdGenerator connectionIdGenerator = new IdGenerator("advisory"); + private final LongSequenceGenerator sessionIdGenerator = new LongSequenceGenerator(); + private final LongSequenceGenerator consumerIdGenerator = new LongSequenceGenerator(); + + @Override + public void virtualDestinationAdded(ConnectionContext context, + VirtualDestination virtualDestination) { + super.virtualDestinationAdded(context, virtualDestination); + + if (virtualDestinations.add(virtualDestination)) { + try { + // Don't advise advisory topics. + if (!AdvisorySupport.isAdvisoryTopic(virtualDestination.getVirtualDestination())) { + + //create demand for consumers on virtual destinations + consumersLock.readLock().lock(); + try { + //loop through existing destinations to see if any match this newly + //created virtual destination + if (getBrokerService().isUseVirtualDestSubsOnCreation()) { + //for matches that are a queue, fire an advisory for demand + for (ActiveMQDestination destination : destinations.keySet()) { + if(destination.isQueue()) { + if (virtualDestinationMatcher.matches(virtualDestination, destination)) { + fireVirtualDestinationAddAdvisory(context, null, destination, virtualDestination); + } + } + } + } + + //loop through existing consumers to see if any of them are consuming on a destination + //that matches the new virtual destination + for (Iterator<ConsumerInfo> iter = consumers.values().iterator(); iter.hasNext(); ) { + ConsumerInfo info = iter.next(); + if (virtualDestinationMatcher.matches(virtualDestination, info.getDestination())) { + fireVirtualDestinationAddAdvisory(context, info, info.getDestination(), virtualDestination); + } + } + } finally { + consumersLock.readLock().unlock(); + } + } + } catch (Exception e) { + handleFireFailure("virtualDestinationAdded", e); + } + } + } + + private void fireVirtualDestinationAddAdvisory(ConnectionContext context, ConsumerInfo info, ActiveMQDestination activeMQDest, + VirtualDestination virtualDestination) throws Exception { + //if no consumer info, we need to create one - this is the case when an advisory is fired + //because of the existence of a destination matching a virtual destination + if (info == null) { + ConnectionId connectionId = new ConnectionId(connectionIdGenerator.generateId()); + SessionId sessionId = new SessionId(connectionId, sessionIdGenerator.getNextSequenceId()); + ConsumerId consumerId = new ConsumerId(sessionId, consumerIdGenerator.getNextSequenceId()); + + info = new ConsumerInfo(consumerId); + + //store the virtual destination and the activeMQDestination as a pair so that we can keep track + //of all matching forwarded destinations that caused demand + if(brokerConsumerDests.putIfAbsent(new VirtualConsumerPair(virtualDestination, activeMQDest), info) == null) { + info.setDestination(virtualDestination.getVirtualDestination()); + ActiveMQTopic topic = AdvisorySupport.getVirtualDestinationConsumerAdvisoryTopic(info.getDestination()); + + if (virtualDestinationConsumers.putIfAbsent(info, virtualDestination) == null) { + fireConsumerAdvisory(context, info.getDestination(), topic, info); + } + } + //this is the case of a real consumer coming online + } else { + info = info.copy(); + info.setDestination(virtualDestination.getVirtualDestination()); + ActiveMQTopic topic = AdvisorySupport.getVirtualDestinationConsumerAdvisoryTopic(info.getDestination()); + + if (virtualDestinationConsumers.putIfAbsent(info, virtualDestination) == null) { + fireConsumerAdvisory(context, info.getDestination(), topic, info); + } + } + } + + @Override + public void virtualDestinationRemoved(ConnectionContext context, + VirtualDestination virtualDestination) { + super.virtualDestinationRemoved(context, virtualDestination); + + if (virtualDestinations.remove(virtualDestination)) { + try { + consumersLock.readLock().lock(); + try { + // remove the demand created by the addition of the virtual destination + if (getBrokerService().isUseVirtualDestSubsOnCreation()) { + if (!AdvisorySupport.isAdvisoryTopic(virtualDestination.getVirtualDestination())) { + for (ConsumerInfo info : virtualDestinationConsumers.keySet()) { + //find all consumers for this virtual destination + if (virtualDestinationConsumers.get(info).equals(virtualDestination)) { + fireVirtualDestinationRemoveAdvisory(context, info); + } + + //check consumers created for the existence of a destination to see if they + //match the consumerinfo and clean up + for (VirtualConsumerPair activeMQDest : brokerConsumerDests.keySet()) { + ConsumerInfo i = brokerConsumerDests.get(activeMQDest); + if (info.equals(i)) { + brokerConsumerDests.remove(activeMQDest); + } + } + } + } + } + } finally { + consumersLock.readLock().unlock(); + } + } catch (Exception e) { + handleFireFailure("virtualDestinationAdded", e); + } + } + } + + private void fireVirtualDestinationRemoveAdvisory(ConnectionContext context, + ConsumerInfo info) throws Exception { + + VirtualDestination virtualDestination = virtualDestinationConsumers.remove(info); + if (virtualDestination != null) { + ActiveMQTopic topic = AdvisorySupport.getVirtualDestinationConsumerAdvisoryTopic(virtualDestination.getVirtualDestination()); + + ActiveMQDestination dest = info.getDestination(); + + if (!dest.isTemporary() || destinations.containsKey(dest)) { + fireConsumerAdvisory(context, dest, topic, info.createRemoveCommand()); + } + } + } + @Override public void isFull(ConnectionContext context, Destination destination, Usage usage) { super.isFull(context, destination, usage); @@ -681,4 +891,61 @@ public class AdvisoryBroker extends BrokerFilter { public Map<ActiveMQDestination, DestinationInfo> getAdvisoryDestinations() { return destinations; } + + private class VirtualConsumerPair { + private final VirtualDestination virtualDestination; + + //destination that matches this virtualDestination as part target + //this is so we can keep track of more than one destination that might + //match the virtualDestination and cause demand + private final ActiveMQDestination activeMQDestination; + + public VirtualConsumerPair(VirtualDestination virtualDestination, + ActiveMQDestination activeMQDestination) { + super(); + this.virtualDestination = virtualDestination; + this.activeMQDestination = activeMQDestination; + } + @Override + public int hashCode() { + final int prime = 31; + int result = 1; + result = prime * result + getOuterType().hashCode(); + result = prime + * result + + ((activeMQDestination == null) ? 0 : activeMQDestination + .hashCode()); + result = prime + * result + + ((virtualDestination == null) ? 0 : virtualDestination + .hashCode()); + return result; + } + @Override + public boolean equals(Object obj) { + if (this == obj) + return true; + if (obj == null) + return false; + if (getClass() != obj.getClass()) + return false; + VirtualConsumerPair other = (VirtualConsumerPair) obj; + if (!getOuterType().equals(other.getOuterType())) + return false; + if (activeMQDestination == null) { + if (other.activeMQDestination != null) + return false; + } else if (!activeMQDestination.equals(other.activeMQDestination)) + return false; + if (virtualDestination == null) { + if (other.virtualDestination != null) + return false; + } else if (!virtualDestination.equals(other.virtualDestination)) + return false; + return true; + } + private AdvisoryBroker getOuterType() { + return AdvisoryBroker.this; + } + } } http://git-wip-us.apache.org/repos/asf/activemq/blob/cc81680e/activemq-broker/src/main/java/org/apache/activemq/advisory/DestinationFilterVirtualDestinationMatcher.java ---------------------------------------------------------------------- diff --git a/activemq-broker/src/main/java/org/apache/activemq/advisory/DestinationFilterVirtualDestinationMatcher.java b/activemq-broker/src/main/java/org/apache/activemq/advisory/DestinationFilterVirtualDestinationMatcher.java new file mode 100644 index 0000000..5c57cf0 --- /dev/null +++ b/activemq-broker/src/main/java/org/apache/activemq/advisory/DestinationFilterVirtualDestinationMatcher.java @@ -0,0 +1,53 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.advisory; + +import org.apache.activemq.broker.region.virtual.CompositeDestination; +import org.apache.activemq.broker.region.virtual.VirtualDestination; +import org.apache.activemq.broker.region.virtual.VirtualTopic; +import org.apache.activemq.command.ActiveMQDestination; +import org.apache.activemq.command.ActiveMQQueue; +import org.apache.activemq.filter.DestinationFilter; + +/** + * This class will use a destination filter to see if the activeMQ destination matches + * the given virtual destination + * + */ +public class DestinationFilterVirtualDestinationMatcher implements VirtualDestinationMatcher { + + /* (non-Javadoc) + * @see org.apache.activemq.advisory.VirtualDestinationMatcher#matches(org.apache.activemq.broker.region.virtual.VirtualDestination) + */ + @Override + public boolean matches(VirtualDestination virtualDestination, ActiveMQDestination activeMQDest) { + if (virtualDestination instanceof CompositeDestination) { + DestinationFilter filter = DestinationFilter.parseFilter(virtualDestination.getMappedDestinations()); + if (filter.matches(activeMQDest)) { + return true; + } + } else if (virtualDestination instanceof VirtualTopic) { + DestinationFilter filter = DestinationFilter.parseFilter(new ActiveMQQueue(((VirtualTopic) virtualDestination).getPrefix() + DestinationFilter.ANY_DESCENDENT)); + if (filter.matches(activeMQDest)) { + return true; + } + } + + return false; + } + +} http://git-wip-us.apache.org/repos/asf/activemq/blob/cc81680e/activemq-broker/src/main/java/org/apache/activemq/advisory/VirtualDestinationMatcher.java ---------------------------------------------------------------------- diff --git a/activemq-broker/src/main/java/org/apache/activemq/advisory/VirtualDestinationMatcher.java b/activemq-broker/src/main/java/org/apache/activemq/advisory/VirtualDestinationMatcher.java new file mode 100644 index 0000000..571a311 --- /dev/null +++ b/activemq-broker/src/main/java/org/apache/activemq/advisory/VirtualDestinationMatcher.java @@ -0,0 +1,29 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.advisory; + +import org.apache.activemq.broker.region.virtual.VirtualDestination; +import org.apache.activemq.command.ActiveMQDestination; + +/** + * + * + */ +public interface VirtualDestinationMatcher { + + public boolean matches(VirtualDestination virtualDestination, ActiveMQDestination activeMQDest); +} http://git-wip-us.apache.org/repos/asf/activemq/blob/cc81680e/activemq-broker/src/main/java/org/apache/activemq/broker/Broker.java ---------------------------------------------------------------------- diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/Broker.java b/activemq-broker/src/main/java/org/apache/activemq/broker/Broker.java index fa8e4fd..87cb3bc 100755 --- a/activemq-broker/src/main/java/org/apache/activemq/broker/Broker.java +++ b/activemq-broker/src/main/java/org/apache/activemq/broker/Broker.java @@ -26,6 +26,7 @@ import org.apache.activemq.broker.region.Destination; import org.apache.activemq.broker.region.MessageReference; import org.apache.activemq.broker.region.Region; import org.apache.activemq.broker.region.Subscription; +import org.apache.activemq.broker.region.virtual.VirtualDestination; import org.apache.activemq.command.ActiveMQDestination; import org.apache.activemq.command.BrokerId; import org.apache.activemq.command.BrokerInfo; @@ -385,6 +386,10 @@ public interface Broker extends Region, Service { */ void isFull(ConnectionContext context,Destination destination,Usage usage); + void virtualDestinationAdded(ConnectionContext context, VirtualDestination virtualDestination); + + void virtualDestinationRemoved(ConnectionContext context, VirtualDestination virtualDestination); + /** * called when the broker becomes the master in a master/slave * configuration http://git-wip-us.apache.org/repos/asf/activemq/blob/cc81680e/activemq-broker/src/main/java/org/apache/activemq/broker/BrokerFilter.java ---------------------------------------------------------------------- diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/BrokerFilter.java b/activemq-broker/src/main/java/org/apache/activemq/broker/BrokerFilter.java index 132b46d..2a8ae71 100755 --- a/activemq-broker/src/main/java/org/apache/activemq/broker/BrokerFilter.java +++ b/activemq-broker/src/main/java/org/apache/activemq/broker/BrokerFilter.java @@ -24,6 +24,7 @@ import java.util.concurrent.ThreadPoolExecutor; import org.apache.activemq.broker.region.Destination; import org.apache.activemq.broker.region.MessageReference; import org.apache.activemq.broker.region.Subscription; +import org.apache.activemq.broker.region.virtual.VirtualDestination; import org.apache.activemq.command.ActiveMQDestination; import org.apache.activemq.command.BrokerId; import org.apache.activemq.command.BrokerInfo; @@ -360,6 +361,18 @@ public class BrokerFilter implements Broker { } @Override + public void virtualDestinationAdded(ConnectionContext context, + VirtualDestination virtualDestination) { + next.virtualDestinationAdded(context, virtualDestination); + } + + @Override + public void virtualDestinationRemoved(ConnectionContext context, + VirtualDestination virtualDestination) { + next.virtualDestinationRemoved(context, virtualDestination); + } + + @Override public void nowMasterBroker() { next.nowMasterBroker(); } http://git-wip-us.apache.org/repos/asf/activemq/blob/cc81680e/activemq-broker/src/main/java/org/apache/activemq/broker/BrokerService.java ---------------------------------------------------------------------- diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/BrokerService.java b/activemq-broker/src/main/java/org/apache/activemq/broker/BrokerService.java index 61a4cef..5e7dd97 100644 --- a/activemq-broker/src/main/java/org/apache/activemq/broker/BrokerService.java +++ b/activemq-broker/src/main/java/org/apache/activemq/broker/BrokerService.java @@ -203,6 +203,15 @@ public class BrokerService implements Service { private boolean useVirtualTopics = true; private boolean useMirroredQueues = false; private boolean useTempMirroredQueues = true; + /** + * Whether or not virtual destination subscriptions should cause network demand + */ + private boolean useVirtualDestSubs = false; + /** + * Whether or no the creation of destinations that match virtual destinations + * should cause network demand + */ + private boolean useVirtualDestSubsOnCreation = false; private BrokerId brokerId; private volatile DestinationInterceptor[] destinationInterceptors; private ActiveMQDestination[] destinations; @@ -2699,6 +2708,14 @@ public class BrokerService implements Service { if (virtualDestination instanceof VirtualTopic) { consumerDestinations.add(new ActiveMQQueue(((VirtualTopic) virtualDestination).getPrefix() + DestinationFilter.ANY_DESCENDENT)); } + if (isUseVirtualDestSubs()) { + try { + broker.virtualDestinationAdded(getAdminConnectionContext(), virtualDestination); + LOG.debug("Adding virtual destination: {}", virtualDestination); + } catch (Exception e) { + LOG.warn("Could not fire virtual destination consumer advisory", e); + } + } } } } @@ -3133,4 +3150,22 @@ public class BrokerService implements Service { public void setRejectDurableConsumers(boolean rejectDurableConsumers) { this.rejectDurableConsumers = rejectDurableConsumers; } + + public boolean isUseVirtualDestSubs() { + return useVirtualDestSubs; + } + + public void setUseVirtualDestSubs( + boolean useVirtualDestSubs) { + this.useVirtualDestSubs = useVirtualDestSubs; + } + + public boolean isUseVirtualDestSubsOnCreation() { + return useVirtualDestSubsOnCreation; + } + + public void setUseVirtualDestSubsOnCreation( + boolean useVirtualDestSubsOnCreation) { + this.useVirtualDestSubsOnCreation = useVirtualDestSubsOnCreation; + } } http://git-wip-us.apache.org/repos/asf/activemq/blob/cc81680e/activemq-broker/src/main/java/org/apache/activemq/broker/EmptyBroker.java ---------------------------------------------------------------------- diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/EmptyBroker.java b/activemq-broker/src/main/java/org/apache/activemq/broker/EmptyBroker.java index 8185554..c4059a0 100644 --- a/activemq-broker/src/main/java/org/apache/activemq/broker/EmptyBroker.java +++ b/activemq-broker/src/main/java/org/apache/activemq/broker/EmptyBroker.java @@ -25,6 +25,7 @@ import java.util.concurrent.ThreadPoolExecutor; import org.apache.activemq.broker.region.Destination; import org.apache.activemq.broker.region.MessageReference; import org.apache.activemq.broker.region.Subscription; +import org.apache.activemq.broker.region.virtual.VirtualDestination; import org.apache.activemq.command.ActiveMQDestination; import org.apache.activemq.command.BrokerId; import org.apache.activemq.command.BrokerInfo; @@ -346,6 +347,16 @@ public class EmptyBroker implements Broker { } @Override + public void virtualDestinationAdded(ConnectionContext context, + VirtualDestination virtualDestination) { + } + + @Override + public void virtualDestinationRemoved(ConnectionContext context, + VirtualDestination virtualDestination) { + } + + @Override public void nowMasterBroker() { } http://git-wip-us.apache.org/repos/asf/activemq/blob/cc81680e/activemq-broker/src/main/java/org/apache/activemq/broker/ErrorBroker.java ---------------------------------------------------------------------- diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/ErrorBroker.java b/activemq-broker/src/main/java/org/apache/activemq/broker/ErrorBroker.java index ae42141..35501e3 100644 --- a/activemq-broker/src/main/java/org/apache/activemq/broker/ErrorBroker.java +++ b/activemq-broker/src/main/java/org/apache/activemq/broker/ErrorBroker.java @@ -25,6 +25,7 @@ import java.util.concurrent.ThreadPoolExecutor; import org.apache.activemq.broker.region.Destination; import org.apache.activemq.broker.region.MessageReference; import org.apache.activemq.broker.region.Subscription; +import org.apache.activemq.broker.region.virtual.VirtualDestination; import org.apache.activemq.command.ActiveMQDestination; import org.apache.activemq.command.BrokerId; import org.apache.activemq.command.BrokerInfo; @@ -362,6 +363,18 @@ public class ErrorBroker implements Broker { } @Override + public void virtualDestinationAdded(ConnectionContext context, + VirtualDestination virtualDestination) { + throw new BrokerStoppedException(this.message); + } + + @Override + public void virtualDestinationRemoved(ConnectionContext context, + VirtualDestination virtualDestination) { + throw new BrokerStoppedException(this.message); + } + + @Override public void nowMasterBroker() { throw new BrokerStoppedException(this.message); } http://git-wip-us.apache.org/repos/asf/activemq/blob/cc81680e/activemq-broker/src/main/java/org/apache/activemq/broker/MutableBrokerFilter.java ---------------------------------------------------------------------- diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/MutableBrokerFilter.java b/activemq-broker/src/main/java/org/apache/activemq/broker/MutableBrokerFilter.java index 2eea2e8..6306325 100644 --- a/activemq-broker/src/main/java/org/apache/activemq/broker/MutableBrokerFilter.java +++ b/activemq-broker/src/main/java/org/apache/activemq/broker/MutableBrokerFilter.java @@ -25,6 +25,7 @@ import java.util.concurrent.atomic.AtomicReference; import org.apache.activemq.broker.region.Destination; import org.apache.activemq.broker.region.MessageReference; import org.apache.activemq.broker.region.Subscription; +import org.apache.activemq.broker.region.virtual.VirtualDestination; import org.apache.activemq.command.ActiveMQDestination; import org.apache.activemq.command.BrokerId; import org.apache.activemq.command.BrokerInfo; @@ -371,6 +372,18 @@ public class MutableBrokerFilter implements Broker { } @Override + public void virtualDestinationAdded(ConnectionContext context, + VirtualDestination virtualDestination) { + getNext().virtualDestinationAdded(context, virtualDestination); + } + + @Override + public void virtualDestinationRemoved(ConnectionContext context, + VirtualDestination virtualDestination) { + getNext().virtualDestinationRemoved(context, virtualDestination); + } + + @Override public void nowMasterBroker() { getNext().nowMasterBroker(); } http://git-wip-us.apache.org/repos/asf/activemq/blob/cc81680e/activemq-broker/src/main/java/org/apache/activemq/broker/region/virtual/CompositeDestination.java ---------------------------------------------------------------------- diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/virtual/CompositeDestination.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/virtual/CompositeDestination.java index 5658839..1b976c0 100644 --- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/virtual/CompositeDestination.java +++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/virtual/CompositeDestination.java @@ -143,4 +143,45 @@ public abstract class CompositeDestination implements VirtualDestination { } }; } + + @Override + public int hashCode() { + final int prime = 31; + int result = 1; + result = prime * result + (concurrentSend ? 1231 : 1237); + result = prime * result + (copyMessage ? 1231 : 1237); + result = prime * result + (forwardOnly ? 1231 : 1237); + result = prime * result + + ((forwardTo == null) ? 0 : forwardTo.hashCode()); + result = prime * result + ((name == null) ? 0 : name.hashCode()); + return result; + } + + @Override + public boolean equals(Object obj) { + if (this == obj) + return true; + if (obj == null) + return false; + if (getClass() != obj.getClass()) + return false; + CompositeDestination other = (CompositeDestination) obj; + if (concurrentSend != other.concurrentSend) + return false; + if (copyMessage != other.copyMessage) + return false; + if (forwardOnly != other.forwardOnly) + return false; + if (forwardTo == null) { + if (other.forwardTo != null) + return false; + } else if (!forwardTo.equals(other.forwardTo)) + return false; + if (name == null) { + if (other.name != null) + return false; + } else if (!name.equals(other.name)) + return false; + return true; + } } http://git-wip-us.apache.org/repos/asf/activemq/blob/cc81680e/activemq-broker/src/main/java/org/apache/activemq/broker/region/virtual/CompositeQueue.java ---------------------------------------------------------------------- diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/virtual/CompositeQueue.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/virtual/CompositeQueue.java index 1b0f75d..d253d9f 100644 --- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/virtual/CompositeQueue.java +++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/virtual/CompositeQueue.java @@ -38,4 +38,9 @@ public class CompositeQueue extends CompositeDestination { // nothing to do for mapped destinations return destination; } + + @Override + public String toString() { + return "CompositeQueue [" + getName() + "]"; + } } http://git-wip-us.apache.org/repos/asf/activemq/blob/cc81680e/activemq-broker/src/main/java/org/apache/activemq/broker/region/virtual/CompositeTopic.java ---------------------------------------------------------------------- diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/virtual/CompositeTopic.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/virtual/CompositeTopic.java index 667a80c..9b817d0 100644 --- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/virtual/CompositeTopic.java +++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/virtual/CompositeTopic.java @@ -41,4 +41,9 @@ public class CompositeTopic extends CompositeDestination { } return destination; } + + @Override + public String toString() { + return "CompositeTopic [" + getName() + "]"; + } } http://git-wip-us.apache.org/repos/asf/activemq/blob/cc81680e/activemq-broker/src/main/java/org/apache/activemq/broker/region/virtual/VirtualTopic.java ---------------------------------------------------------------------- diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/virtual/VirtualTopic.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/virtual/VirtualTopic.java index 14ea3fe..7049ccb 100644 --- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/virtual/VirtualTopic.java +++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/virtual/VirtualTopic.java @@ -194,4 +194,53 @@ public class VirtualTopic implements VirtualDestination { public void setTransactedSend(boolean transactedSend) { this.transactedSend = transactedSend; } + + @Override + public int hashCode() { + final int prime = 31; + int result = 1; + result = prime * result + (concurrentSend ? 1231 : 1237); + result = prime * result + (local ? 1231 : 1237); + result = prime * result + ((name == null) ? 0 : name.hashCode()); + result = prime * result + ((postfix == null) ? 0 : postfix.hashCode()); + result = prime * result + ((prefix == null) ? 0 : prefix.hashCode()); + result = prime * result + (selectorAware ? 1231 : 1237); + result = prime * result + (transactedSend ? 1231 : 1237); + return result; + } + + @Override + public boolean equals(Object obj) { + if (this == obj) + return true; + if (obj == null) + return false; + if (getClass() != obj.getClass()) + return false; + VirtualTopic other = (VirtualTopic) obj; + if (concurrentSend != other.concurrentSend) + return false; + if (local != other.local) + return false; + if (name == null) { + if (other.name != null) + return false; + } else if (!name.equals(other.name)) + return false; + if (postfix == null) { + if (other.postfix != null) + return false; + } else if (!postfix.equals(other.postfix)) + return false; + if (prefix == null) { + if (other.prefix != null) + return false; + } else if (!prefix.equals(other.prefix)) + return false; + if (selectorAware != other.selectorAware) + return false; + if (transactedSend != other.transactedSend) + return false; + return true; + } } http://git-wip-us.apache.org/repos/asf/activemq/blob/cc81680e/activemq-broker/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java ---------------------------------------------------------------------- diff --git a/activemq-broker/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java b/activemq-broker/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java index ad6fd61..fac39ac 100644 --- a/activemq-broker/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java +++ b/activemq-broker/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java @@ -1352,7 +1352,8 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge, Br } protected void configureDemandSubscription(ConsumerInfo info, DemandSubscription sub) throws IOException { - if (AdvisorySupport.isConsumerAdvisoryTopic(info.getDestination())) { + if (AdvisorySupport.isConsumerAdvisoryTopic(info.getDestination()) || + AdvisorySupport.isVirtualDestinationConsumerAdvisoryTopic(info.getDestination())) { sub.getLocalInfo().setDispatchAsync(true); } else { sub.getLocalInfo().setDispatchAsync(configuration.isDispatchAsync()); http://git-wip-us.apache.org/repos/asf/activemq/blob/cc81680e/activemq-broker/src/main/java/org/apache/activemq/network/NetworkBridgeConfiguration.java ---------------------------------------------------------------------- diff --git a/activemq-broker/src/main/java/org/apache/activemq/network/NetworkBridgeConfiguration.java b/activemq-broker/src/main/java/org/apache/activemq/network/NetworkBridgeConfiguration.java index 3a59f30..09127a1 100644 --- a/activemq-broker/src/main/java/org/apache/activemq/network/NetworkBridgeConfiguration.java +++ b/activemq-broker/src/main/java/org/apache/activemq/network/NetworkBridgeConfiguration.java @@ -29,6 +29,7 @@ import org.apache.activemq.command.ConsumerInfo; public class NetworkBridgeConfiguration { private boolean conduitSubscriptions = true; + private boolean useVirtualDestSubs; private boolean dynamicOnly; private boolean dispatchAsync = true; private boolean decreaseNetworkConsumerPriority; @@ -237,11 +238,27 @@ public class NetworkBridgeConfiguration { filter.append("."); filter.append(destination.getPhysicalName()); delimiter = ","; + + if (useVirtualDestSubs) { + filter.append(delimiter); + filter.append(AdvisorySupport.VIRTUAL_DESTINATION_CONSUMER_ADVISORY_TOPIC_PREFIX); + filter.append(destination.getDestinationTypeAsString()); + filter.append("."); + filter.append(destination.getPhysicalName()); + } } } return filter.toString(); } else { - return AdvisorySupport.CONSUMER_ADVISORY_TOPIC_PREFIX + ">"; + StringBuffer filter = new StringBuffer(); + filter.append(AdvisorySupport.CONSUMER_ADVISORY_TOPIC_PREFIX); + filter.append(">"); + if (useVirtualDestSubs) { + filter.append(","); + filter.append(AdvisorySupport.VIRTUAL_DESTINATION_CONSUMER_ADVISORY_TOPIC_PREFIX); + filter.append(">"); + } + return filter.toString(); } } else { // prepend consumer advisory prefix @@ -449,4 +466,13 @@ public class NetworkBridgeConfiguration { this.checkDuplicateMessagesOnDuplex = checkDuplicateMessagesOnDuplex; } + public boolean isUseVirtualDestSus() { + return useVirtualDestSubs; + } + + public void setUseVirtualDestSubs( + boolean useVirtualDestSubs) { + this.useVirtualDestSubs = useVirtualDestSubs; + } + } http://git-wip-us.apache.org/repos/asf/activemq/blob/cc81680e/activemq-client/src/main/java/org/apache/activemq/advisory/AdvisorySupport.java ---------------------------------------------------------------------- diff --git a/activemq-client/src/main/java/org/apache/activemq/advisory/AdvisorySupport.java b/activemq-client/src/main/java/org/apache/activemq/advisory/AdvisorySupport.java index b26c600..ac3ee03 100755 --- a/activemq-client/src/main/java/org/apache/activemq/advisory/AdvisorySupport.java +++ b/activemq-client/src/main/java/org/apache/activemq/advisory/AdvisorySupport.java @@ -37,8 +37,11 @@ public final class AdvisorySupport { public static final String QUEUE_PRODUCER_ADVISORY_TOPIC_PREFIX = PRODUCER_ADVISORY_TOPIC_PREFIX + "Queue."; public static final String TOPIC_PRODUCER_ADVISORY_TOPIC_PREFIX = PRODUCER_ADVISORY_TOPIC_PREFIX + "Topic."; public static final String CONSUMER_ADVISORY_TOPIC_PREFIX = ADVISORY_TOPIC_PREFIX + "Consumer."; + public static final String VIRTUAL_DESTINATION_CONSUMER_ADVISORY_TOPIC_PREFIX = ADVISORY_TOPIC_PREFIX + "VirtualDestination.Consumer."; public static final String QUEUE_CONSUMER_ADVISORY_TOPIC_PREFIX = CONSUMER_ADVISORY_TOPIC_PREFIX + "Queue."; public static final String TOPIC_CONSUMER_ADVISORY_TOPIC_PREFIX = CONSUMER_ADVISORY_TOPIC_PREFIX + "Topic."; + public static final String QUEUE_VIRTUAL_DESTINATION_CONSUMER_ADVISORY_TOPIC_PREFIX = VIRTUAL_DESTINATION_CONSUMER_ADVISORY_TOPIC_PREFIX + "Queue."; + public static final String TOPIC_VIRTUAL_DESTINATION_CONSUMER_ADVISORY_TOPIC_PREFIX = VIRTUAL_DESTINATION_CONSUMER_ADVISORY_TOPIC_PREFIX + "Topic."; public static final String EXPIRED_TOPIC_MESSAGES_TOPIC_PREFIX = ADVISORY_TOPIC_PREFIX + "Expired.Topic."; public static final String EXPIRED_QUEUE_MESSAGES_TOPIC_PREFIX = ADVISORY_TOPIC_PREFIX + "Expired.Queue."; public static final String NO_TOPIC_CONSUMERS_TOPIC_PREFIX = ADVISORY_TOPIC_PREFIX + "NoConsumer.Topic."; @@ -116,6 +119,16 @@ public final class AdvisorySupport { return getAdvisoryTopic(destination, prefix, true); } + public static ActiveMQTopic getVirtualDestinationConsumerAdvisoryTopic(ActiveMQDestination destination) { + String prefix; + if (destination.isQueue()) { + prefix = QUEUE_VIRTUAL_DESTINATION_CONSUMER_ADVISORY_TOPIC_PREFIX; + } else { + prefix = TOPIC_VIRTUAL_DESTINATION_CONSUMER_ADVISORY_TOPIC_PREFIX; + } + return getAdvisoryTopic(destination, prefix, true); + } + public static ActiveMQTopic getProducerAdvisoryTopic(Destination destination) throws JMSException { return getProducerAdvisoryTopic(ActiveMQMessageTransformation.transformDestination(destination)); } @@ -389,6 +402,24 @@ public final class AdvisorySupport { } } + public static boolean isVirtualDestinationConsumerAdvisoryTopic(Destination destination) throws JMSException { + return isVirtualDestinationConsumerAdvisoryTopic(ActiveMQMessageTransformation.transformDestination(destination)); + } + + public static boolean isVirtualDestinationConsumerAdvisoryTopic(ActiveMQDestination destination) { + if (destination.isComposite()) { + ActiveMQDestination[] compositeDestinations = destination.getCompositeDestinations(); + for (int i = 0; i < compositeDestinations.length; i++) { + if (isVirtualDestinationConsumerAdvisoryTopic(compositeDestinations[i])) { + return true; + } + } + return false; + } else { + return destination.isTopic() && destination.getPhysicalName().startsWith(VIRTUAL_DESTINATION_CONSUMER_ADVISORY_TOPIC_PREFIX); + } + } + public static boolean isSlowConsumerAdvisoryTopic(Destination destination) throws JMSException { return isSlowConsumerAdvisoryTopic(ActiveMQMessageTransformation.transformDestination(destination)); } http://git-wip-us.apache.org/repos/asf/activemq/blob/cc81680e/activemq-client/src/main/java/org/apache/activemq/command/NetworkBridgeFilter.java ---------------------------------------------------------------------- diff --git a/activemq-client/src/main/java/org/apache/activemq/command/NetworkBridgeFilter.java b/activemq-client/src/main/java/org/apache/activemq/command/NetworkBridgeFilter.java index 245c098..5bd80b0 100644 --- a/activemq-client/src/main/java/org/apache/activemq/command/NetworkBridgeFilter.java +++ b/activemq-client/src/main/java/org/apache/activemq/command/NetworkBridgeFilter.java @@ -51,14 +51,17 @@ public class NetworkBridgeFilter implements DataStructure, BooleanExpression { this.consumerInfo = consumerInfo; } + @Override public byte getDataStructureType() { return DATA_STRUCTURE_TYPE; } + @Override public boolean isMarshallAware() { return false; } + @Override public boolean matches(MessageEvaluationContext mec) throws JMSException { try { // for Queues - the message can be acknowledged and dropped whilst @@ -72,6 +75,7 @@ public class NetworkBridgeFilter implements DataStructure, BooleanExpression { } } + @Override public Object evaluate(MessageEvaluationContext message) throws JMSException { return matches(message) ? Boolean.TRUE : Boolean.FALSE; } @@ -125,7 +129,9 @@ public class NetworkBridgeFilter implements DataStructure, BooleanExpression { } public static boolean isAdvisoryInterpretedByNetworkBridge(Message message) { - return AdvisorySupport.isConsumerAdvisoryTopic(message.getDestination()) || AdvisorySupport.isTempDestinationAdvisoryTopic(message.getDestination()); + return AdvisorySupport.isConsumerAdvisoryTopic(message.getDestination()) || + AdvisorySupport.isVirtualDestinationConsumerAdvisoryTopic(message.getDestination()) || + AdvisorySupport.isTempDestinationAdvisoryTopic(message.getDestination()); } public static boolean contains(BrokerId[] brokerPath, BrokerId brokerId) { http://git-wip-us.apache.org/repos/asf/activemq/blob/cc81680e/activemq-runtime-config/src/main/java/org/apache/activemq/plugin/UpdateVirtualDestinationsTask.java ---------------------------------------------------------------------- diff --git a/activemq-runtime-config/src/main/java/org/apache/activemq/plugin/UpdateVirtualDestinationsTask.java b/activemq-runtime-config/src/main/java/org/apache/activemq/plugin/UpdateVirtualDestinationsTask.java index cd0121c..ef7e1b5 100644 --- a/activemq-runtime-config/src/main/java/org/apache/activemq/plugin/UpdateVirtualDestinationsTask.java +++ b/activemq-runtime-config/src/main/java/org/apache/activemq/plugin/UpdateVirtualDestinationsTask.java @@ -18,16 +18,23 @@ package org.apache.activemq.plugin; import java.util.ArrayList; import java.util.Arrays; +import java.util.Collections; +import java.util.HashSet; import java.util.List; +import java.util.Set; +import org.apache.activemq.broker.ConnectionContext; import org.apache.activemq.broker.region.CompositeDestinationInterceptor; import org.apache.activemq.broker.region.DestinationInterceptor; import org.apache.activemq.broker.region.RegionBroker; import org.apache.activemq.broker.region.virtual.VirtualDestination; import org.apache.activemq.broker.region.virtual.VirtualDestinationInterceptor; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; public abstract class UpdateVirtualDestinationsTask implements Runnable { + public static final Logger LOG = LoggerFactory.getLogger(UpdateVirtualDestinationsTask.class); private final AbstractRuntimeConfigurationBroker plugin; public UpdateVirtualDestinationsTask( @@ -49,11 +56,52 @@ public abstract class UpdateVirtualDestinationsTask implements Runnable { // update existing interceptor final VirtualDestinationInterceptor virtualDestinationInterceptor = (VirtualDestinationInterceptor) destinationInterceptor; + Set<VirtualDestination> existingVirtualDests = new HashSet<>(); + Collections.addAll(existingVirtualDests, virtualDestinationInterceptor.getVirtualDestinations()); + + Set<VirtualDestination> newVirtualDests = new HashSet<>(); + Collections.addAll(newVirtualDests, getVirtualDestinations()); + + Set<VirtualDestination> addedVirtualDests = new HashSet<>(); + Set<VirtualDestination> removedVirtualDests = new HashSet<>(); + //detect new virtual destinations + for (VirtualDestination newVirtualDest : newVirtualDests) { + if (!existingVirtualDests.contains(newVirtualDest)) { + addedVirtualDests.add(newVirtualDest); + } + } + //detect removed virtual destinations + for (VirtualDestination existingVirtualDest : existingVirtualDests) { + if (!newVirtualDests.contains(existingVirtualDest)) { + removedVirtualDests.add(existingVirtualDest); + } + } + virtualDestinationInterceptor .setVirtualDestinations(getVirtualDestinations()); plugin.info("applied updates to: " + virtualDestinationInterceptor); updatedExistingInterceptor = true; + + ConnectionContext connectionContext; + try { + connectionContext = plugin.getBrokerService().getAdminConnectionContext(); + //signal updates + if (plugin.getBrokerService().isUseVirtualDestSubs()) { + for (VirtualDestination removedVirtualDest : removedVirtualDests) { + plugin.virtualDestinationRemoved(connectionContext, removedVirtualDest); + LOG.info("Removing virtual destination: {}", removedVirtualDest); + } + + for (VirtualDestination addedVirtualDest : addedVirtualDests) { + plugin.virtualDestinationAdded(connectionContext, addedVirtualDest); + LOG.info("Adding virtual destination: {}", addedVirtualDest); + } + } + + } catch (Exception e) { + LOG.warn("Could not process virtual destination advisories", e); + } } } http://git-wip-us.apache.org/repos/asf/activemq/blob/cc81680e/activemq-unit-tests/pom.xml ---------------------------------------------------------------------- diff --git a/activemq-unit-tests/pom.xml b/activemq-unit-tests/pom.xml index 5df7fcf..e03b246 100755 --- a/activemq-unit-tests/pom.xml +++ b/activemq-unit-tests/pom.xml @@ -71,6 +71,10 @@ <artifactId>activemq-partition</artifactId> </dependency> <dependency> + <groupId>org.apache.activemq</groupId> + <artifactId>activemq-runtime-config</artifactId> + </dependency> + <dependency> <groupId>org.apache.geronimo.specs</groupId> <artifactId>geronimo-jms_1.1_spec</artifactId> </dependency>
