Repository: activemq Updated Branches: refs/heads/trunk 0c0fadcdc -> ff64b14bc
https://issues.apache.org/jira/browse/AMQ-5186 - remove amqp producers Project: http://git-wip-us.apache.org/repos/asf/activemq/repo Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/ff64b14b Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/ff64b14b Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/ff64b14b Branch: refs/heads/trunk Commit: ff64b14bc78466df96d16b1d04e862a7ddef3204 Parents: 0c0fadc Author: Dejan Bosanac <[email protected]> Authored: Thu May 15 15:13:01 2014 +0200 Committer: Dejan Bosanac <[email protected]> Committed: Thu May 15 15:13:01 2014 +0200 ---------------------------------------------------------------------- .../transport/amqp/AmqpProtocolConverter.java | 114 ++++++++++--------- 1 file changed, 62 insertions(+), 52 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/activemq/blob/ff64b14b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpProtocolConverter.java ---------------------------------------------------------------------- diff --git a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpProtocolConverter.java b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpProtocolConverter.java index 02621fc..b8a27c4 100644 --- a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpProtocolConverter.java +++ b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpProtocolConverter.java @@ -553,6 +553,7 @@ class AmqpProtocolConverter implements IAmqpProtocolConverter { private final ProducerId producerId; private final LongSequenceGenerator messageIdGenerator = new LongSequenceGenerator(); private final ActiveMQDestination destination; + private boolean closed; public ProducerContext(ProducerId producerId, ActiveMQDestination destination) { this.producerId = producerId; @@ -561,70 +562,79 @@ class AmqpProtocolConverter implements IAmqpProtocolConverter { @Override protected void onMessage(final Receiver receiver, final Delivery delivery, Buffer buffer) throws Exception { - EncodedMessage em = new EncodedMessage(delivery.getMessageFormat(), buffer.data, buffer.offset, buffer.length); - final ActiveMQMessage message = (ActiveMQMessage) getInboundTransformer().transform(em); - current = null; + if (!closed) { + EncodedMessage em = new EncodedMessage(delivery.getMessageFormat(), buffer.data, buffer.offset, buffer.length); + final ActiveMQMessage message = (ActiveMQMessage) getInboundTransformer().transform(em); + current = null; - if (destination != null) { - message.setJMSDestination(destination); - } - message.setProducerId(producerId); + if (destination != null) { + message.setJMSDestination(destination); + } + message.setProducerId(producerId); - // Always override the AMQP client's MessageId with our own. Preserve the - // original in the TextView property for later Ack. - MessageId messageId = new MessageId(producerId, messageIdGenerator.getNextSequenceId()); + // Always override the AMQP client's MessageId with our own. Preserve the + // original in the TextView property for later Ack. + MessageId messageId = new MessageId(producerId, messageIdGenerator.getNextSequenceId()); - MessageId amqpMessageId = message.getMessageId(); - if (amqpMessageId != null) { - if (amqpMessageId.getTextView() != null) { - messageId.setTextView(amqpMessageId.getTextView()); - } else { - messageId.setTextView(amqpMessageId.toString()); + MessageId amqpMessageId = message.getMessageId(); + if (amqpMessageId != null) { + if (amqpMessageId.getTextView() != null) { + messageId.setTextView(amqpMessageId.getTextView()); + } else { + messageId.setTextView(amqpMessageId.toString()); + } } - } - message.setMessageId(messageId); + message.setMessageId(messageId); - LOG.trace("Inbound Message:{} from Producer:{}", message.getMessageId(), producerId + ":" + messageId.getProducerSequenceId()); + LOG.trace("Inbound Message:{} from Producer:{}", message.getMessageId(), producerId + ":" + messageId.getProducerSequenceId()); - DeliveryState remoteState = delivery.getRemoteState(); - if (remoteState != null && remoteState instanceof TransactionalState) { - TransactionalState s = (TransactionalState) remoteState; - long txid = toLong(s.getTxnId()); - message.setTransactionId(new LocalTransactionId(connectionId, txid)); - } + DeliveryState remoteState = delivery.getRemoteState(); + if (remoteState != null && remoteState instanceof TransactionalState) { + TransactionalState s = (TransactionalState) remoteState; + long txid = toLong(s.getTxnId()); + message.setTransactionId(new LocalTransactionId(connectionId, txid)); + } - // Lets handle the case where the expiration was set, but the timestamp - // was not set by the client. Lets assign the timestamp now, and adjust the - // expiration. - if (message.getExpiration() != 0) { - if (message.getTimestamp() == 0) { - message.setTimestamp(System.currentTimeMillis()); - message.setExpiration(message.getTimestamp() + message.getExpiration()); + // Lets handle the case where the expiration was set, but the timestamp + // was not set by the client. Lets assign the timestamp now, and adjust the + // expiration. + if (message.getExpiration() != 0) { + if (message.getTimestamp() == 0) { + message.setTimestamp(System.currentTimeMillis()); + message.setExpiration(message.getTimestamp() + message.getExpiration()); + } } - } - message.onSend(); - sendToActiveMQ(message, new ResponseHandler() { - @Override - public void onResponse(IAmqpProtocolConverter converter, Response response) throws IOException { - if (!delivery.remotelySettled()) { - if (response.isException()) { - ExceptionResponse er = (ExceptionResponse) response; - Rejected rejected = new Rejected(); - ErrorCondition condition = new ErrorCondition(); - condition.setCondition(Symbol.valueOf("failed")); - condition.setDescription(er.getException().getMessage()); - rejected.setError(condition); - delivery.disposition(rejected); + message.onSend(); + sendToActiveMQ(message, new ResponseHandler() { + @Override + public void onResponse(IAmqpProtocolConverter converter, Response response) throws IOException { + if (!delivery.remotelySettled()) { + if (response.isException()) { + ExceptionResponse er = (ExceptionResponse) response; + Rejected rejected = new Rejected(); + ErrorCondition condition = new ErrorCondition(); + condition.setCondition(Symbol.valueOf("failed")); + condition.setDescription(er.getException().getMessage()); + rejected.setError(condition); + delivery.disposition(rejected); + } } + receiver.flow(1); + delivery.disposition(Accepted.getInstance()); + delivery.settle(); + pumpProtonToSocket(); } - receiver.flow(1); - delivery.disposition(Accepted.getInstance()); - delivery.settle(); - pumpProtonToSocket(); - } - }); + }); + } + } + + @Override + public void onClose() throws Exception { + if (!closed) { + sendToActiveMQ(new RemoveInfo(producerId), null); + } } }
