Repository: activemq Updated Branches: refs/heads/master 049f8da23 -> ffdaeb2bd
AMQ-5920 - make using a vt transaction configurable, a transaction negates concurrentstoreanddispatch and imposes local 2pc on mKahadb so needs to be off by default Project: http://git-wip-us.apache.org/repos/asf/activemq/repo Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/ffdaeb2b Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/ffdaeb2b Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/ffdaeb2b Branch: refs/heads/master Commit: ffdaeb2bd19b299613de7aebc809079bc2cd4416 Parents: 049f8da Author: gtully <[email protected]> Authored: Mon Aug 17 15:35:36 2015 +0100 Committer: gtully <[email protected]> Committed: Mon Aug 17 15:35:36 2015 +0100 ---------------------------------------------------------------------- .../activemq/broker/region/virtual/VirtualTopic.java | 13 +++++++++++++ .../broker/region/virtual/VirtualTopicInterceptor.java | 5 ++++- .../broker/virtual/VirtualTopicFanoutPerfTest.java | 1 + 3 files changed, 18 insertions(+), 1 deletion(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/activemq/blob/ffdaeb2b/activemq-broker/src/main/java/org/apache/activemq/broker/region/virtual/VirtualTopic.java ---------------------------------------------------------------------- diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/virtual/VirtualTopic.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/virtual/VirtualTopic.java index 95fa333..14ea3fe 100644 --- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/virtual/VirtualTopic.java +++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/virtual/VirtualTopic.java @@ -43,6 +43,7 @@ public class VirtualTopic implements VirtualDestination { private boolean selectorAware = false; private boolean local = false; private boolean concurrentSend = false; + private boolean transactedSend = false; @Override public ActiveMQDestination getVirtualDestination() { @@ -181,4 +182,16 @@ public class VirtualTopic implements VirtualDestination { public void setConcurrentSend(boolean concurrentSend) { this.concurrentSend = concurrentSend; } + + public boolean isTransactedSend() { + return transactedSend; + } + + /** + * When true, dispatch to matching destinations always uses a transaction. + * @param transactedSend + */ + public void setTransactedSend(boolean transactedSend) { + this.transactedSend = transactedSend; + } } http://git-wip-us.apache.org/repos/asf/activemq/blob/ffdaeb2b/activemq-broker/src/main/java/org/apache/activemq/broker/region/virtual/VirtualTopicInterceptor.java ---------------------------------------------------------------------- diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/virtual/VirtualTopicInterceptor.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/virtual/VirtualTopicInterceptor.java index 7967562..36c08e0 100644 --- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/virtual/VirtualTopicInterceptor.java +++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/virtual/VirtualTopicInterceptor.java @@ -43,6 +43,8 @@ public class VirtualTopicInterceptor extends DestinationFilter { private final String postfix; private final boolean local; private final boolean concurrentSend; + private final boolean transactedSend; + private final LRUCache<ActiveMQDestination, ActiveMQQueue> cache = new LRUCache<ActiveMQDestination, ActiveMQQueue>(); public VirtualTopicInterceptor(Destination next, VirtualTopic virtualTopic) { @@ -51,6 +53,7 @@ public class VirtualTopicInterceptor extends DestinationFilter { this.postfix = virtualTopic.getPostfix(); this.local = virtualTopic.isLocal(); this.concurrentSend = virtualTopic.isConcurrentSend(); + this.transactedSend = virtualTopic.isTransactedSend(); } public Topic getTopic() { @@ -120,7 +123,7 @@ public class VirtualTopicInterceptor extends DestinationFilter { private LocalTransactionId beginLocalTransaction(int numDestinations, ConnectionContext connectionContext, Message message) throws Exception { LocalTransactionId result = null; - if (numDestinations > 1 && message.isPersistent() && message.getTransactionId() == null) { + if (transactedSend && numDestinations > 1 && message.isPersistent() && message.getTransactionId() == null) { result = new LocalTransactionId(new ConnectionId(message.getMessageId().getProducerId().toString()), message.getMessageId().getProducerSequenceId()); connectionContext.getBroker().beginTransaction(connectionContext, result); connectionContext.setTransaction(connectionContext.getTransactions().get(result)); http://git-wip-us.apache.org/repos/asf/activemq/blob/ffdaeb2b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/virtual/VirtualTopicFanoutPerfTest.java ---------------------------------------------------------------------- diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/broker/virtual/VirtualTopicFanoutPerfTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/virtual/VirtualTopicFanoutPerfTest.java index 90cdeea..4ba82eb 100644 --- a/activemq-unit-tests/src/test/java/org/apache/activemq/broker/virtual/VirtualTopicFanoutPerfTest.java +++ b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/virtual/VirtualTopicFanoutPerfTest.java @@ -55,6 +55,7 @@ public class VirtualTopicFanoutPerfTest { for (VirtualDestination virtualDestination : ((VirtualDestinationInterceptor) destinationInterceptor).getVirtualDestinations()) { if (virtualDestination instanceof VirtualTopic) { ((VirtualTopic) virtualDestination).setConcurrentSend(true); + ((VirtualTopic) virtualDestination).setTransactedSend(true); } } }
