Repository: activemq Updated Branches: refs/heads/trunk ae595c95b -> 2a0be3b0f
https://issues.apache.org/jira/browse/AMQ-5550 Ensure that the consumer and producer context instances are marked as closed when creation at the broker end fails. Project: http://git-wip-us.apache.org/repos/asf/activemq/repo Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/2a0be3b0 Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/2a0be3b0 Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/2a0be3b0 Branch: refs/heads/trunk Commit: 2a0be3b0f09a5473e5bd7944fce5286742c80a58 Parents: ae595c9 Author: Timothy Bish <[email protected]> Authored: Fri Jan 30 10:35:35 2015 -0500 Committer: Timothy Bish <[email protected]> Committed: Fri Jan 30 10:35:35 2015 -0500 ---------------------------------------------------------------------- .../activemq/transport/amqp/AmqpProtocolConverter.java | 13 ++++++++++++- 1 file changed, 12 insertions(+), 1 deletion(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/activemq/blob/2a0be3b0/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpProtocolConverter.java ---------------------------------------------------------------------- diff --git a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpProtocolConverter.java b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpProtocolConverter.java index 238b8b0..131df8f 100644 --- a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpProtocolConverter.java +++ b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpProtocolConverter.java @@ -316,6 +316,9 @@ class AmqpProtocolConverter implements IAmqpProtocolConverter { Event event = null; while ((event = eventCollector.peek()) != null) { + if (amqpTransport.isTrace()) { + LOG.trace("Processing event: {}", event.getType()); + } switch (event.getType()) { case CONNECTION_REMOTE_OPEN: case CONNECTION_REMOTE_CLOSE: @@ -761,6 +764,10 @@ class AmqpProtocolConverter implements IAmqpProtocolConverter { } } + public void close() { + closed = true; + } + public boolean isAnonymous() { return anonymous; } @@ -898,7 +905,7 @@ class AmqpProtocolConverter implements IAmqpProtocolConverter { dest = createDestination(remoteTarget); } - ProducerContext producerContext = new ProducerContext(producerId, dest, anonymous); + final ProducerContext producerContext = new ProducerContext(producerId, dest, anonymous); receiver.setContext(producerContext); receiver.flow(flow); @@ -916,7 +923,9 @@ class AmqpProtocolConverter implements IAmqpProtocolConverter { } else { receiver.setCondition(new ErrorCondition(AmqpError.INTERNAL_ERROR, exception.getMessage())); } + producerContext.closed = true; receiver.close(); + receiver.free(); } else { receiver.open(); } @@ -1423,7 +1432,9 @@ class AmqpProtocolConverter implements IAmqpProtocolConverter { sender.setCondition(new ErrorCondition(AmqpError.INTERNAL_ERROR, exception.getMessage())); } subscriptionsByConsumerId.remove(id); + consumerContext.closed = true; sender.close(); + sender.free(); } else { sessionContext.consumers.put(id, consumerContext); sender.open();
