Repository: activemq Updated Branches: refs/heads/master 4e23adfcc -> 83827f277
https://issues.apache.org/jira/browse/AMQ-6341 Wait on broker response for async broker commands. Project: http://git-wip-us.apache.org/repos/asf/activemq/repo Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/83827f27 Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/83827f27 Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/83827f27 Branch: refs/heads/master Commit: 83827f27709a2b8b1f8d080a4913100b80fe3429 Parents: 4e23adf Author: Timothy Bish <[email protected]> Authored: Wed Jun 29 20:13:34 2016 -0400 Committer: Timothy Bish <[email protected]> Committed: Wed Jun 29 20:13:34 2016 -0400 ---------------------------------------------------------------------- .../transport/amqp/protocol/AmqpReceiver.java | 12 ++++-- .../transport/amqp/protocol/AmqpSender.java | 42 +++++++++++++------- .../transport/amqp/protocol/AmqpSession.java | 12 ++++-- 3 files changed, 44 insertions(+), 22 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/activemq/blob/83827f27/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/protocol/AmqpReceiver.java ---------------------------------------------------------------------- diff --git a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/protocol/AmqpReceiver.java b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/protocol/AmqpReceiver.java index 07abb42..3ae018e 100644 --- a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/protocol/AmqpReceiver.java +++ b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/protocol/AmqpReceiver.java @@ -90,10 +90,16 @@ public class AmqpReceiver extends AmqpAbstractReceiver { @Override public void close() { if (!isClosed() && isOpened()) { - sendToActiveMQ(new RemoveInfo(getProducerId())); - } + sendToActiveMQ(new RemoveInfo(getProducerId()), new ResponseHandler() { - super.close(); + @Override + public void onResponse(AmqpProtocolConverter converter, Response response) throws IOException { + AmqpReceiver.super.close(); + } + }); + } else { + super.close(); + } } //----- Configuration accessors ------------------------------------------// http://git-wip-us.apache.org/repos/asf/activemq/blob/83827f27/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/protocol/AmqpSender.java ---------------------------------------------------------------------- diff --git a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/protocol/AmqpSender.java b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/protocol/AmqpSender.java index 9fb85a3..12bd627 100644 --- a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/protocol/AmqpSender.java +++ b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/protocol/AmqpSender.java @@ -118,12 +118,18 @@ public class AmqpSender extends AmqpAbstractLink<Sender> { if (!isClosed() && isOpened()) { RemoveInfo removeCommand = new RemoveInfo(getConsumerId()); removeCommand.setLastDeliveredSequenceId(lastDeliveredSequenceId); - sendToActiveMQ(removeCommand); - session.unregisterSender(getConsumerId()); - } + sendToActiveMQ(removeCommand, new ResponseHandler() { - super.detach(); + @Override + public void onResponse(AmqpProtocolConverter converter, Response response) throws IOException { + session.unregisterSender(getConsumerId()); + AmqpSender.super.detach(); + } + }); + } else { + super.detach(); + } } @Override @@ -131,21 +137,27 @@ public class AmqpSender extends AmqpAbstractLink<Sender> { if (!isClosed() && isOpened()) { RemoveInfo removeCommand = new RemoveInfo(getConsumerId()); removeCommand.setLastDeliveredSequenceId(lastDeliveredSequenceId); - sendToActiveMQ(removeCommand); - if (consumerInfo.isDurable()) { - RemoveSubscriptionInfo rsi = new RemoveSubscriptionInfo(); - rsi.setConnectionId(session.getConnection().getConnectionId()); - rsi.setSubscriptionName(getEndpoint().getName()); - rsi.setClientId(session.getConnection().getClientId()); + sendToActiveMQ(removeCommand, new ResponseHandler() { - sendToActiveMQ(rsi); - } + @Override + public void onResponse(AmqpProtocolConverter converter, Response response) throws IOException { + if (consumerInfo.isDurable()) { + RemoveSubscriptionInfo rsi = new RemoveSubscriptionInfo(); + rsi.setConnectionId(session.getConnection().getConnectionId()); + rsi.setSubscriptionName(getEndpoint().getName()); + rsi.setClientId(session.getConnection().getClientId()); - session.unregisterSender(getConsumerId()); - } + sendToActiveMQ(rsi); + } - super.close(); + session.unregisterSender(getConsumerId()); + AmqpSender.super.close(); + } + }); + } else { + super.close(); + } } @Override http://git-wip-us.apache.org/repos/asf/activemq/blob/83827f27/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/protocol/AmqpSession.java ---------------------------------------------------------------------- diff --git a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/protocol/AmqpSession.java b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/protocol/AmqpSession.java index 20a8b9f..c390b8c 100644 --- a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/protocol/AmqpSession.java +++ b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/protocol/AmqpSession.java @@ -108,11 +108,15 @@ public class AmqpSession implements AmqpResource { public void close() { LOG.debug("Session {} closed", getSessionId()); - getEndpoint().setContext(null); - getEndpoint().close(); - getEndpoint().free(); + connection.sendToActiveMQ(new RemoveInfo(getSessionId()), new ResponseHandler() { - connection.sendToActiveMQ(new RemoveInfo(getSessionId())); + @Override + public void onResponse(AmqpProtocolConverter converter, Response response) throws IOException { + getEndpoint().setContext(null); + getEndpoint().close(); + getEndpoint().free(); + } + }); } /**
