Compress transport API a bit and use only ByteBuf Project: http://git-wip-us.apache.org/repos/asf/qpid-jms/repo Commit: http://git-wip-us.apache.org/repos/asf/qpid-jms/commit/44a422fd Tree: http://git-wip-us.apache.org/repos/asf/qpid-jms/tree/44a422fd Diff: http://git-wip-us.apache.org/repos/asf/qpid-jms/diff/44a422fd
Branch: refs/heads/master Commit: 44a422fd60964f662731d4f7cb475cc5efd455f0 Parents: 858e12b Author: Timothy Bish <[email protected]> Authored: Tue Jan 20 18:27:45 2015 -0500 Committer: Timothy Bish <[email protected]> Committed: Tue Jan 20 18:27:45 2015 -0500 ---------------------------------------------------------------------- .../qpid/jms/provider/amqp/AmqpProvider.java | 7 ++++++- .../org/apache/qpid/jms/transports/Transport.java | 11 ----------- .../jms/transports/netty/NettyTcpTransport.java | 16 ---------------- .../jms/transports/plain/PlainTcpTransport.java | 13 ++++--------- .../qpid/jms/transports/vertx/TcpTransport.java | 18 +----------------- 5 files changed, 11 insertions(+), 54 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/44a422fd/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpProvider.java ---------------------------------------------------------------------- diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpProvider.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpProvider.java index 5054dfb..3c3a470 100644 --- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpProvider.java +++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpProvider.java @@ -17,6 +17,7 @@ package org.apache.qpid.jms.provider.amqp; import io.netty.buffer.ByteBuf; +import io.netty.buffer.Unpooled; import io.netty.util.ReferenceCountUtil; import java.io.IOException; @@ -786,7 +787,11 @@ public class AmqpProvider implements Provider, TransportListener { if (isTraceBytes()) { TRACE_BYTES.info("Sending: {}", toWrite.toString()); } - transport.send(toWrite); + + byte[] copy = new byte[toWrite.remaining()]; + toWrite.get(copy); + + transport.send(Unpooled.wrappedBuffer(copy)); protonTransport.outputConsumed(); } else { done = true; http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/44a422fd/qpid-jms-client/src/main/java/org/apache/qpid/jms/transports/Transport.java ---------------------------------------------------------------------- diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/transports/Transport.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/transports/Transport.java index d8e221e..9a95234 100644 --- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/transports/Transport.java +++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/transports/Transport.java @@ -19,7 +19,6 @@ package org.apache.qpid.jms.transports; import io.netty.buffer.ByteBuf; import java.io.IOException; -import java.nio.ByteBuffer; /** * Base class for all QpidJMS Transport instances. @@ -54,16 +53,6 @@ public interface Transport { * * @throws IOException if an error occurs during the send operation. */ - void send(ByteBuffer output) throws IOException; - - /** - * Sends a chunk of data over the Transport connection. - * - * @param output - * The buffer of data that is to be transmitted. - * - * @throws IOException if an error occurs during the send operation. - */ void send(ByteBuf output) throws IOException; /** http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/44a422fd/qpid-jms-client/src/main/java/org/apache/qpid/jms/transports/netty/NettyTcpTransport.java ---------------------------------------------------------------------- diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/transports/netty/NettyTcpTransport.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/transports/netty/NettyTcpTransport.java index e0d276b..b9832ce 100644 --- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/transports/netty/NettyTcpTransport.java +++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/transports/netty/NettyTcpTransport.java @@ -18,7 +18,6 @@ package org.apache.qpid.jms.transports.netty; import io.netty.bootstrap.Bootstrap; import io.netty.buffer.ByteBuf; -import io.netty.buffer.Unpooled; import io.netty.channel.Channel; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelHandlerContext; @@ -32,7 +31,6 @@ import io.netty.channel.socket.nio.NioSocketChannel; import java.io.IOException; import java.net.URI; -import java.nio.ByteBuffer; import java.util.concurrent.atomic.AtomicBoolean; import org.apache.qpid.jms.transports.Transport; @@ -142,20 +140,6 @@ public class NettyTcpTransport implements Transport { } @Override - public void send(ByteBuffer output) throws IOException { - checkConnected(); - int length = output.remaining(); - if (length == 0) { - return; - } - - byte[] copy = new byte[length]; - output.get(copy); - - send(Unpooled.wrappedBuffer(copy)); - } - - @Override public void send(ByteBuf output) throws IOException { checkConnected(); int length = output.readableBytes(); http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/44a422fd/qpid-jms-client/src/main/java/org/apache/qpid/jms/transports/plain/PlainTcpTransport.java ---------------------------------------------------------------------- diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/transports/plain/PlainTcpTransport.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/transports/plain/PlainTcpTransport.java index b736379..2adc685 100644 --- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/transports/plain/PlainTcpTransport.java +++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/transports/plain/PlainTcpTransport.java @@ -188,21 +188,16 @@ public class PlainTcpTransport implements Transport, Runnable { } @Override - public void send(ByteBuffer output) throws IOException { + public void send(ByteBuf output) throws IOException { checkConnected(); - LOG.trace("Transport sending packet of size: {}", output.remaining()); + ByteBuffer toWrite = output.nioBuffer(); + LOG.trace("Transport sending packet of size: {}", toWrite.remaining()); WritableByteChannel channel = Channels.newChannel(dataOut); - channel.write(output); + channel.write(toWrite); dataOut.flush(); } @Override - public void send(ByteBuf output) throws IOException { - checkConnected(); - send(output.nioBuffer()); - } - - @Override public boolean isConnected() { return this.connected.get(); } http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/44a422fd/qpid-jms-client/src/main/java/org/apache/qpid/jms/transports/vertx/TcpTransport.java ---------------------------------------------------------------------- diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/transports/vertx/TcpTransport.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/transports/vertx/TcpTransport.java index e824ec4..4207fd9 100644 --- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/transports/vertx/TcpTransport.java +++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/transports/vertx/TcpTransport.java @@ -20,14 +20,13 @@ import io.netty.buffer.ByteBuf; import java.io.IOException; import java.net.URI; -import java.nio.ByteBuffer; import java.util.concurrent.CountDownLatch; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicReference; -import org.apache.qpid.jms.transports.TransportOptions; import org.apache.qpid.jms.transports.Transport; import org.apache.qpid.jms.transports.TransportListener; +import org.apache.qpid.jms.transports.TransportOptions; import org.apache.qpid.jms.util.IOExceptionSupport; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -174,21 +173,6 @@ public class TcpTransport implements Transport { } @Override - public void send(ByteBuffer output) throws IOException { - checkConnected(); - int length = output.remaining(); - if (length == 0) { - return; - } - - byte[] copy = new byte[length]; - output.get(copy); - Buffer sendBuffer = new Buffer(copy); - - vertx.eventBus().send(socket.writeHandlerID(), sendBuffer); - } - - @Override public void send(ByteBuf output) throws IOException { checkConnected(); int length = output.readableBytes(); --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
