Repository: activemq Updated Branches: refs/heads/master b313209aa -> 3a5f127d5
AMQ-5738: Ensure the sender links for non-durable consumers also get unregistered, remove duplicate closed field, fix method names. Project: http://git-wip-us.apache.org/repos/asf/activemq/repo Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/3a5f127d Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/3a5f127d Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/3a5f127d Branch: refs/heads/master Commit: 3a5f127d52f96466bb9e2c660ac5dbbed03ecf1d Parents: b313209 Author: Robert Gemmell <rob...@apache.org> Authored: Wed Apr 22 10:17:38 2015 +0100 Committer: Robert Gemmell <rob...@apache.org> Committed: Wed Apr 22 10:17:38 2015 +0100 ---------------------------------------------------------------------- .../activemq/transport/amqp/protocol/AmqpConnection.java | 4 ++-- .../activemq/transport/amqp/protocol/AmqpSender.java | 11 +++++------ .../activemq/transport/amqp/protocol/AmqpSession.java | 6 +++--- 3 files changed, 10 insertions(+), 11 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/activemq/blob/3a5f127d/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/protocol/AmqpConnection.java ---------------------------------------------------------------------- diff --git a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/protocol/AmqpConnection.java b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/protocol/AmqpConnection.java index bab16c9..c0ea6ad 100644 --- a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/protocol/AmqpConnection.java +++ b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/protocol/AmqpConnection.java @@ -563,11 +563,11 @@ public class AmqpConnection implements AmqpProtocolConverter { //----- Utility methods for connection resources to use ------------------// - void regosterSender(ConsumerId consumerId, AmqpSender sender) { + void registerSender(ConsumerId consumerId, AmqpSender sender) { subscriptionsByConsumerId.put(consumerId, sender); } - void unregosterSender(ConsumerId consumerId) { + void unregisterSender(ConsumerId consumerId) { subscriptionsByConsumerId.remove(consumerId); } http://git-wip-us.apache.org/repos/asf/activemq/blob/3a5f127d/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/protocol/AmqpSender.java ---------------------------------------------------------------------- diff --git a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/protocol/AmqpSender.java b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/protocol/AmqpSender.java index eefcbe3..13826b3 100644 --- a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/protocol/AmqpSender.java +++ b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/protocol/AmqpSender.java @@ -80,7 +80,6 @@ public class AmqpSender extends AmqpAbstractLink<Sender> { private final ConsumerInfo consumerInfo; private final boolean presettle; - private boolean closed; private int currentCredit; private boolean draining; private long lastDeliveredSequenceId; @@ -108,8 +107,8 @@ public class AmqpSender extends AmqpAbstractLink<Sender> { @Override public void open() { - if (!closed) { - session.regosterSender(getConsumerId(), this); + if (!isClosed()) { + session.registerSender(getConsumerId(), this); } super.open(); @@ -142,9 +141,9 @@ public class AmqpSender extends AmqpAbstractLink<Sender> { rsi.setClientId(session.getConnection().getClientId()); sendToActiveMQ(rsi, null); - - session.unregisterSender(getConsumerId()); } + + session.unregisterSender(getConsumerId()); } super.close(); @@ -350,7 +349,7 @@ public class AmqpSender extends AmqpAbstractLink<Sender> { //----- Internal Implementation ------------------------------------------// public void pumpOutbound() throws Exception { - while (!closed) { + while (!isClosed()) { while (currentBuffer != null) { int sent = getEndpoint().send(currentBuffer.data, currentBuffer.offset, currentBuffer.length); if (sent > 0) { http://git-wip-us.apache.org/repos/asf/activemq/blob/3a5f127d/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/protocol/AmqpSession.java ---------------------------------------------------------------------- diff --git a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/protocol/AmqpSession.java b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/protocol/AmqpSession.java index d9f0c0f..d2901ba 100644 --- a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/protocol/AmqpSession.java +++ b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/protocol/AmqpSession.java @@ -345,14 +345,14 @@ public class AmqpSession implements AmqpResource { connection.pumpProtonToSocket(); } - public void regosterSender(ConsumerId consumerId, AmqpSender sender) { + public void registerSender(ConsumerId consumerId, AmqpSender sender) { consumers.put(consumerId, sender); - connection.regosterSender(consumerId, sender); + connection.registerSender(consumerId, sender); } public void unregisterSender(ConsumerId consumerId) { consumers.remove(consumerId); - connection.unregosterSender(consumerId); + connection.unregisterSender(consumerId); } //----- Configuration accessors ------------------------------------------//