Repository: activemq Updated Branches: refs/heads/trunk b9d51bf1d -> 9b6f419d4
https://issues.apache.org/jira/browse/AMQ-5195 Set session incoming credit value and improves the flow handling to reduce chatter on each message send, improves overall producer performance significantly. Project: http://git-wip-us.apache.org/repos/asf/activemq/repo Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/9b6f419d Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/9b6f419d Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/9b6f419d Branch: refs/heads/trunk Commit: 9b6f419d441458a624ffc7a1f4132021dcdeb88a Parents: b9d51bf Author: Timothy Bish <[email protected]> Authored: Tue May 20 18:26:17 2014 -0400 Committer: Timothy Bish <[email protected]> Committed: Tue May 20 18:26:17 2014 -0400 ---------------------------------------------------------------------- .../transport/amqp/AmqpProtocolConverter.java | 33 +++++++++++++++----- 1 file changed, 25 insertions(+), 8 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/activemq/blob/9b6f419d/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpProtocolConverter.java ---------------------------------------------------------------------- diff --git a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpProtocolConverter.java b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpProtocolConverter.java index 05e20dc..36a41e6 100644 --- a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpProtocolConverter.java +++ b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpProtocolConverter.java @@ -170,6 +170,7 @@ class AmqpProtocolConverter implements IAmqpProtocolConverter { while (!done) { ByteBuffer toWrite = protonTransport.getOutputBuffer(); if (toWrite != null && toWrite.hasRemaining()) { + LOG.trace("Sending {} bytes out", toWrite.limit()); amqpTransport.sendToAmqp(toWrite); protonTransport.outputConsumed(); } else { @@ -463,6 +464,7 @@ class AmqpProtocolConverter implements IAmqpProtocolConverter { AmqpSessionContext sessionContext = new AmqpSessionContext(connectionId, nextSessionId++); session.setContext(sessionContext); sendToActiveMQ(new SessionInfo(sessionContext.sessionId), null); + session.setIncomingCapacity(AmqpWireFormat.DEFAULT_MAX_FRAME_SIZE * prefetch); session.open(); } @@ -608,10 +610,10 @@ class AmqpProtocolConverter implements IAmqpProtocolConverter { } message.onSend(); - sendToActiveMQ(message, new ResponseHandler() { - @Override - public void onResponse(IAmqpProtocolConverter converter, Response response) throws IOException { - if (!delivery.remotelySettled()) { + if (!delivery.remotelySettled()) { + sendToActiveMQ(message, new ResponseHandler() { + @Override + public void onResponse(IAmqpProtocolConverter converter, Response response) throws IOException { if (response.isException()) { ExceptionResponse er = (ExceptionResponse) response; Rejected rejected = new Rejected(); @@ -620,14 +622,29 @@ class AmqpProtocolConverter implements IAmqpProtocolConverter { condition.setDescription(er.getException().getMessage()); rejected.setError(condition); delivery.disposition(rejected); + } else { + if (receiver.getCredit() <= (prefetch * .2)) { + LOG.trace("Sending more credit ({}) to producer: {}", + prefetch - receiver.getCredit(), producerId); + receiver.flow(prefetch - receiver.getCredit()); + } + + delivery.disposition(Accepted.getInstance()); + delivery.settle(); } + + pumpProtonToSocket(); } - receiver.flow(1); - delivery.disposition(Accepted.getInstance()); - delivery.settle(); + }); + } else { + if (receiver.getCredit() <= (prefetch * .2)) { + LOG.trace("Sending more credit ({}) to producer: {}", + prefetch - receiver.getCredit(), producerId); + receiver.flow(prefetch - receiver.getCredit()); pumpProtonToSocket(); } - }); + sendToActiveMQ(message, null); + } } }
