Repository: qpid-jms Updated Branches: refs/heads/master 72efc6992 -> 1819bb277
QPIDJMS-269 Resolve performance regression for transacted sends Remove some unnecessary accounting on transacted sends that leads to a performance degradation compared to previous releases. Project: http://git-wip-us.apache.org/repos/asf/qpid-jms/repo Commit: http://git-wip-us.apache.org/repos/asf/qpid-jms/commit/1819bb27 Tree: http://git-wip-us.apache.org/repos/asf/qpid-jms/tree/1819bb27 Diff: http://git-wip-us.apache.org/repos/asf/qpid-jms/diff/1819bb27 Branch: refs/heads/master Commit: 1819bb277f42a895c98c3b764299c0b7371af2e1 Parents: 72efc69 Author: Timothy Bish <[email protected]> Authored: Fri Mar 3 12:30:41 2017 -0500 Committer: Timothy Bish <[email protected]> Committed: Fri Mar 3 12:30:41 2017 -0500 ---------------------------------------------------------------------- .../amqp/AmqpAnonymousFallbackProducer.java | 6 --- .../qpid/jms/provider/amqp/AmqpConsumer.java | 4 +- .../jms/provider/amqp/AmqpFixedProducer.java | 11 ----- .../qpid/jms/provider/amqp/AmqpProducer.java | 10 ---- .../provider/amqp/AmqpTransactionContext.java | 52 ++++++++------------ 5 files changed, 22 insertions(+), 61 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/1819bb27/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpAnonymousFallbackProducer.java ---------------------------------------------------------------------- diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpAnonymousFallbackProducer.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpAnonymousFallbackProducer.java index 447b4b2..a73ec92 100644 --- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpAnonymousFallbackProducer.java +++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpAnonymousFallbackProducer.java @@ -139,12 +139,6 @@ public class AmqpAnonymousFallbackProducer extends AmqpProducer { return new JmsProducerId(producerIdKey, -1, producerIdCount++); } - @Override - public void addSendCompletionWatcher(AsyncResult watcher) { - throw new UnsupportedOperationException( - "The fallback producer parent should never have a watcher assigned."); - } - //----- AsyncResult objects used to complete the sends -------------------// private abstract class AnonymousRequest extends WrappedAsyncResult { http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/1819bb27/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpConsumer.java ---------------------------------------------------------------------- diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpConsumer.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpConsumer.java index 28f2ba5..b682c73 100644 --- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpConsumer.java +++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpConsumer.java @@ -138,7 +138,7 @@ public class AmqpConsumer extends AmqpAbstractResource<JmsConsumerInfo, Receiver public void run() { LOG.trace("Consumer {} drain request timed out", getConsumerId()); Exception cause = new JmsOperationTimedOutException("Remote did not respond to a drain request in time"); - if (session.isTransacted() && session.getTransactionContext().isInTransaction(AmqpConsumer.this)) { + if (session.isTransacted() && session.getTransactionContext().isInTransaction(getConsumerId())) { stopRequest.onFailure(cause); stopRequest = null; } else { @@ -645,7 +645,7 @@ public class AmqpConsumer extends AmqpAbstractResource<JmsConsumerInfo, Receiver } private boolean shouldDeferClose() { - if (getSession().isTransacted() && getSession().getTransactionContext().isInTransaction(this)) { + if (getSession().isTransacted() && getSession().getTransactionContext().isInTransaction(getConsumerId())) { return true; } http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/1819bb27/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpFixedProducer.java ---------------------------------------------------------------------- diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpFixedProducer.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpFixedProducer.java index 4acd419..4297147 100644 --- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpFixedProducer.java +++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpFixedProducer.java @@ -303,17 +303,6 @@ public class AmqpFixedProducer extends AmqpProducer { } } - @Override - public void addSendCompletionWatcher(AsyncResult watcher) { - // If none pending signal done already. - // TODO - If we don't include blocked sends then update this. - if (blocked.isEmpty() && sent.isEmpty()) { - watcher.onSuccess(); - } else { - this.sendCompletionWatcher = watcher; - } - } - //----- Class used to manage held sends ----------------------------------// private class InFlightSend implements AsyncResult, AmqpExceptionBuilder { http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/1819bb27/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpProducer.java ---------------------------------------------------------------------- diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpProducer.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpProducer.java index b862fb0..6999d76 100644 --- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpProducer.java +++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpProducer.java @@ -94,14 +94,4 @@ public abstract class AmqpProducer extends AmqpAbstractResource<JmsProducerInfo, public void setDelayedDeliverySupported(boolean delayedDeliverySupported) { this.delayedDeliverySupported = delayedDeliverySupported; } - - /** - * Allows a completion request to be added to this producer that will be notified - * once all outstanding sends have completed. - * - * @param watcher - * The AsyncResult that will be signaled once this producer has no pending sends. - */ - public abstract void addSendCompletionWatcher(AsyncResult watcher); - } http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/1819bb27/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpTransactionContext.java ---------------------------------------------------------------------- diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpTransactionContext.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpTransactionContext.java index 8f3aab8..1a43aef 100644 --- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpTransactionContext.java +++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpTransactionContext.java @@ -17,12 +17,14 @@ package org.apache.qpid.jms.provider.amqp; import java.io.IOException; -import java.util.LinkedHashSet; -import java.util.Set; +import java.util.HashMap; +import java.util.Map; import javax.jms.IllegalStateException; import javax.jms.TransactionRolledBackException; +import org.apache.qpid.jms.meta.JmsConsumerId; +import org.apache.qpid.jms.meta.JmsProducerId; import org.apache.qpid.jms.meta.JmsSessionInfo; import org.apache.qpid.jms.meta.JmsTransactionId; import org.apache.qpid.jms.meta.JmsTransactionInfo; @@ -43,8 +45,8 @@ public class AmqpTransactionContext implements AmqpResourceParent { private static final Logger LOG = LoggerFactory.getLogger(AmqpTransactionContext.class); private final AmqpSession session; - private final Set<AmqpConsumer> txConsumers = new LinkedHashSet<AmqpConsumer>(); - private final Set<AmqpProducer> txProducers = new LinkedHashSet<AmqpProducer>(); + private final Map<JmsConsumerId, AmqpConsumer> txConsumers = new HashMap<>(); + private final Map<JmsProducerId, AmqpProducer> txProducers = new HashMap<>(); private JmsTransactionId current; private AmqpTransactionCoordinator coordinator; @@ -130,15 +132,8 @@ public class AmqpTransactionContext implements AmqpResourceParent { DischargeCompletion dischargeResult = new DischargeCompletion(request, true); - if (txProducers.isEmpty()) { - LOG.trace("TX Context[{}] committing current TX[[]]", this, current); - coordinator.discharge(current, dischargeResult, true); - } else { - SendCompletion producersSendCompletion = new SendCompletion(transactionInfo, dischargeResult, txProducers.size(), true); - for (AmqpProducer producer : txProducers) { - producer.addSendCompletionWatcher(producersSendCompletion); - } - } + LOG.trace("TX Context[{}] committing current TX[[]]", this, current); + coordinator.discharge(current, dischargeResult, true); } public void rollback(JmsTransactionInfo transactionInfo, final AsyncResult request) throws Exception { @@ -157,33 +152,26 @@ public class AmqpTransactionContext implements AmqpResourceParent { DischargeCompletion dischargeResult = new DischargeCompletion(request, false); - if (txProducers.isEmpty()) { - LOG.trace("TX Context[{}] rolling back current TX[[]]", this, current); - coordinator.discharge(current, dischargeResult, false); - } else { - SendCompletion producersSendCompletion = new SendCompletion(transactionInfo, dischargeResult, txProducers.size(), false); - for (AmqpProducer producer : txProducers) { - producer.addSendCompletionWatcher(producersSendCompletion); - } - } + LOG.trace("TX Context[{}] rolling back current TX[[]]", this, current); + coordinator.discharge(current, dischargeResult, false); } //----- Context utility methods ------------------------------------------// public void registerTxConsumer(AmqpConsumer consumer) { - txConsumers.add(consumer); + txConsumers.put(consumer.getConsumerId(), consumer); } - public boolean isInTransaction(AmqpConsumer consumer) { - return txConsumers.contains(consumer); + public boolean isInTransaction(JmsConsumerId consumerId) { + return txConsumers.containsKey(consumerId); } public void registerTxProducer(AmqpProducer producer) { - txProducers.add(producer); + txProducers.put(producer.getProducerId(), producer); } - public boolean isInTransaction(AmqpProducer producer) { - return txProducers.contains(producer); + public boolean isInTransaction(JmsProducerId producerId) { + return txProducers.containsKey(producerId); } public AmqpSession getSession() { @@ -214,19 +202,19 @@ public class AmqpTransactionContext implements AmqpResourceParent { //----- Transaction pre / post completion --------------------------------// private void preCommit() { - for (AmqpConsumer consumer : txConsumers) { + for (AmqpConsumer consumer : txConsumers.values()) { consumer.preCommit(); } } private void preRollback() { - for (AmqpConsumer consumer : txConsumers) { + for (AmqpConsumer consumer : txConsumers.values()) { consumer.preRollback(); } } private void postCommit() { - for (AmqpConsumer consumer : txConsumers) { + for (AmqpConsumer consumer : txConsumers.values()) { consumer.postCommit(); } @@ -235,7 +223,7 @@ public class AmqpTransactionContext implements AmqpResourceParent { } private void postRollback() { - for (AmqpConsumer consumer : txConsumers) { + for (AmqpConsumer consumer : txConsumers.values()) { consumer.postRollback(); } --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
