Updated Branches: refs/heads/trunk 9eb7fb906 -> e102e64e9
Use non-deprecated output methods for proton to allow for faster bulk sends of outbound amqp frames. Project: http://git-wip-us.apache.org/repos/asf/activemq/repo Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/e102e64e Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/e102e64e Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/e102e64e Branch: refs/heads/trunk Commit: e102e64e9da9477494e73fe7291f15cf50b4ad70 Parents: 9eb7fb9 Author: Timothy Bish <[email protected]> Authored: Tue Feb 4 12:47:42 2014 -0500 Committer: Timothy Bish <[email protected]> Committed: Tue Feb 4 12:47:42 2014 -0500 ---------------------------------------------------------------------- .../transport/amqp/AmqpProtocolConverter.java | 16 +++++++-------- .../activemq/transport/amqp/AmqpWireFormat.java | 21 ++++++++++++++++++-- 2 files changed, 27 insertions(+), 10 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/activemq/blob/e102e64e/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpProtocolConverter.java ---------------------------------------------------------------------- diff --git a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpProtocolConverter.java b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpProtocolConverter.java index 62a05b9..7e24957 100644 --- a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpProtocolConverter.java +++ b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpProtocolConverter.java @@ -18,6 +18,7 @@ package org.apache.activemq.transport.amqp; import java.io.IOException; import java.io.UnsupportedEncodingException; +import java.nio.ByteBuffer; import java.util.EnumSet; import java.util.HashMap; import java.util.HashSet; @@ -159,16 +160,13 @@ class AmqpProtocolConverter implements IAmqpProtocolConverter { void pumpProtonToSocket() { try { - int size = 1024 * 64; - byte data[] = new byte[size]; boolean done = false; while (!done) { - int count = protonTransport.output(data, 0, size); - if (count > 0) { - final Buffer buffer; - buffer = new Buffer(data, 0, count); - // System.out.println("writing: " + buffer.toString().substring(5).replaceAll("(..)", "$1 ")); - amqpTransport.sendToAmqp(buffer); + ByteBuffer toWrite = protonTransport.getOutputBuffer(); + if (toWrite != null && toWrite.hasRemaining()) { +// // System.out.println("writing: " + buffer.toString().substring(5).replaceAll("(..)", "$1 ")); + amqpTransport.sendToAmqp(toWrite); + protonTransport.outputConsumed(); } else { done = true; } @@ -248,10 +246,12 @@ class AmqpProtocolConverter implements IAmqpProtocolConverter { sasl.done(Sasl.SaslOutcome.PN_SASL_OK); amqpTransport.getWireFormat().magicRead = false; sasl = null; + LOG.debug("SASL [PLAIN] Handshake complete."); } else if ("ANONYMOUS".equals(sasl.getRemoteMechanisms()[0])) { sasl.done(Sasl.SaslOutcome.PN_SASL_OK); amqpTransport.getWireFormat().magicRead = false; sasl = null; + LOG.debug("SASL [ANONYMOUS] Handshake complete."); } } } http://git-wip-us.apache.org/repos/asf/activemq/blob/e102e64e/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpWireFormat.java ---------------------------------------------------------------------- diff --git a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpWireFormat.java b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpWireFormat.java index 4a11374..13a264a 100644 --- a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpWireFormat.java +++ b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpWireFormat.java @@ -21,6 +21,10 @@ import java.io.DataInputStream; import java.io.DataOutput; import java.io.DataOutputStream; import java.io.IOException; +import java.io.OutputStream; +import java.nio.ByteBuffer; +import java.nio.channels.Channels; +import java.nio.channels.WritableByteChannel; import org.apache.activemq.util.ByteArrayInputStream; import org.apache.activemq.util.ByteArrayOutputStream; @@ -53,8 +57,21 @@ public class AmqpWireFormat implements WireFormat { @Override public void marshal(Object command, DataOutput dataOut) throws IOException { - Buffer frame = (Buffer) command; - frame.writeTo(dataOut); + if (command instanceof ByteBuffer) { + ByteBuffer buffer = (ByteBuffer) command; + + if (dataOut instanceof OutputStream) { + WritableByteChannel channel = Channels.newChannel((OutputStream) dataOut); + channel.write(buffer); + } else { + while (buffer.hasRemaining()) { + dataOut.writeByte(buffer.get()); + } + } + } else { + Buffer frame = (Buffer) command; + frame.writeTo(dataOut); + } } boolean magicRead = false;
