Repository: activemq Updated Branches: refs/heads/master c1e7dbd53 -> 573b366ca
AMQ-6100 - use setOriginalDestination=false to make this behaviour optional b/c amqp cannot see the original dest property due to immutability of the message properties and folks can depend on the existing original destination behaviour Project: http://git-wip-us.apache.org/repos/asf/activemq/repo Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/573b366c Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/573b366c Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/573b366c Branch: refs/heads/master Commit: 573b366ca00a088835625d0014a5fbb27a947d7c Parents: c1e7dbd Author: gtully <[email protected]> Authored: Thu May 10 10:55:15 2018 +0100 Committer: gtully <[email protected]> Committed: Thu May 10 10:55:15 2018 +0100 ---------------------------------------------------------------------- .../broker/region/virtual/VirtualTopic.java | 9 +++++++++ .../region/virtual/VirtualTopicInterceptor.java | 8 ++++++-- .../MessageDestinationVirtualTopicTest.java | 19 +++++++++++++++++-- .../virtual/virtual-topic-network-test.xml | 6 +++++- 4 files changed, 37 insertions(+), 5 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/activemq/blob/573b366c/activemq-broker/src/main/java/org/apache/activemq/broker/region/virtual/VirtualTopic.java ---------------------------------------------------------------------- diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/virtual/VirtualTopic.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/virtual/VirtualTopic.java index b51f2b0..2c22c65 100644 --- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/virtual/VirtualTopic.java +++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/virtual/VirtualTopic.java @@ -45,6 +45,7 @@ public class VirtualTopic implements VirtualDestination { private boolean concurrentSend = false; private boolean transactedSend = false; private boolean dropOnResourceLimit = false; + private boolean setOriginalDestination = true; @Override public ActiveMQDestination getVirtualDestination() { @@ -252,4 +253,12 @@ public class VirtualTopic implements VirtualDestination { public void setDropOnResourceLimit(boolean dropOnResourceLimit) { this.dropOnResourceLimit = dropOnResourceLimit; } + + public boolean isSetOriginalDestination() { + return setOriginalDestination; + } + + public void setSetOriginalDestination(boolean setOriginalDestination) { + this.setOriginalDestination = setOriginalDestination; + } } http://git-wip-us.apache.org/repos/asf/activemq/blob/573b366c/activemq-broker/src/main/java/org/apache/activemq/broker/region/virtual/VirtualTopicInterceptor.java ---------------------------------------------------------------------- diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/virtual/VirtualTopicInterceptor.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/virtual/VirtualTopicInterceptor.java index 36cae3f..c8f058a 100644 --- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/virtual/VirtualTopicInterceptor.java +++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/virtual/VirtualTopicInterceptor.java @@ -47,6 +47,7 @@ public class VirtualTopicInterceptor extends DestinationFilter { private final boolean concurrentSend; private final boolean transactedSend; private final boolean dropMessageOnResourceLimit; + private final boolean setOriginalDestination; private final LRUCache<ActiveMQDestination, ActiveMQQueue> cache = new LRUCache<ActiveMQDestination, ActiveMQQueue>(); @@ -58,6 +59,7 @@ public class VirtualTopicInterceptor extends DestinationFilter { this.concurrentSend = virtualTopic.isConcurrentSend(); this.transactedSend = virtualTopic.isTransactedSend(); this.dropMessageOnResourceLimit = virtualTopic.isDropOnResourceLimit(); + this.setOriginalDestination = virtualTopic.isSetOriginalDestination(); } public Topic getTopic() { @@ -137,8 +139,10 @@ public class VirtualTopicInterceptor extends DestinationFilter { private Message copy(Message original, ActiveMQDestination target) { Message msg = original.copy(); - msg.setDestination(target); - msg.setOriginalDestination(original.getDestination()); + if (setOriginalDestination) { + msg.setDestination(target); + msg.setOriginalDestination(original.getDestination()); + } return msg; } http://git-wip-us.apache.org/repos/asf/activemq/blob/573b366c/activemq-unit-tests/src/test/java/org/apache/activemq/broker/virtual/MessageDestinationVirtualTopicTest.java ---------------------------------------------------------------------- diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/broker/virtual/MessageDestinationVirtualTopicTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/virtual/MessageDestinationVirtualTopicTest.java index f370efc..2c45566 100644 --- a/activemq-unit-tests/src/test/java/org/apache/activemq/broker/virtual/MessageDestinationVirtualTopicTest.java +++ b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/virtual/MessageDestinationVirtualTopicTest.java @@ -43,6 +43,8 @@ public class MessageDestinationVirtualTopicTest { private SimpleMessageListener listener2; + private SimpleMessageListener listener3; + @Resource(name = "broker1") private BrokerService broker1; @@ -78,8 +80,16 @@ public class MessageDestinationVirtualTopicTest { listener1 = new SimpleMessageListener(); consumer1.setMessageListener(listener1); + // Create listener on Broker B1 for VT T2 witout setOriginalDest + Queue consumer3Queue = session1.createQueue("Consumer.A.VirtualTopic.T2"); + + // Bind listener on queue for consumer D + MessageConsumer consumerD = session1.createConsumer(consumer3Queue); + listener3 = new SimpleMessageListener(); + consumerD.setMessageListener(listener3); + // Create producer for topic, on B1 - Topic virtualTopicT1 = session1.createTopic("VirtualTopic.T1"); + Topic virtualTopicT1 = session1.createTopic("VirtualTopic.T1,VirtualTopic.T2"); producer = session1.createProducer(virtualTopicT1); } @@ -94,9 +104,10 @@ public class MessageDestinationVirtualTopicTest { init(); // Create a monitor - CountDownLatch monitor = new CountDownLatch(2); + CountDownLatch monitor = new CountDownLatch(3); listener1.setCountDown(monitor); listener2.setCountDown(monitor); + listener3.setCountDown(monitor); LOG.info("Sending message"); // Send a message on the topic @@ -112,9 +123,13 @@ public class MessageDestinationVirtualTopicTest { String lastJMSDestination1 = listener1.getLastJMSDestination(); System.err.println(lastJMSDestination1); + String lastJMSDestination3 = listener3.getLastJMSDestination(); + System.err.println(lastJMSDestination3); + // The destination names assertEquals("queue://Consumer.D.VirtualTopic.T1", lastJMSDestination2); assertEquals("queue://Consumer.C.VirtualTopic.T1", lastJMSDestination1); + assertEquals("topic://VirtualTopic.T2", lastJMSDestination3); } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/activemq/blob/573b366c/activemq-unit-tests/src/test/resources/org/apache/activemq/broker/virtual/virtual-topic-network-test.xml ---------------------------------------------------------------------- diff --git a/activemq-unit-tests/src/test/resources/org/apache/activemq/broker/virtual/virtual-topic-network-test.xml b/activemq-unit-tests/src/test/resources/org/apache/activemq/broker/virtual/virtual-topic-network-test.xml index 6b6199c..ab225d8 100644 --- a/activemq-unit-tests/src/test/resources/org/apache/activemq/broker/virtual/virtual-topic-network-test.xml +++ b/activemq-unit-tests/src/test/resources/org/apache/activemq/broker/virtual/virtual-topic-network-test.xml @@ -44,7 +44,9 @@ <amq:virtualDestinations> <!-- Virtual topic policies --> <!-- they should be local to avoid message duplicate --> - <amq:virtualTopic name="VirtualTopic.>" prefix="Consumer.*."/> + <amq:virtualTopic name="VirtualTopic.T1" prefix="Consumer.*."/> + <amq:virtualTopic name="VirtualTopic.T2" prefix="Consumer.*." setOriginalDestination="false"/> + </amq:virtualDestinations> </amq:virtualDestinationInterceptor> </amq:destinationInterceptors> @@ -66,6 +68,7 @@ <amq:destinations> <!-- topics --> <amq:topic physicalName="VirtualTopic.T1" /> + <amq:topic physicalName="VirtualTopic.T2" /> </amq:destinations> </amq:broker> @@ -109,6 +112,7 @@ <amq:destinations> <!-- topics --> <amq:topic physicalName="VirtualTopic.T1" /> + <amq:topic physicalName="VirtualTopic.T2" /> </amq:destinations> </amq:broker>
