Repository: qpid-jms Updated Branches: refs/heads/master 856e2b9ac -> 7750a1c27
QPIDJMS-399 Split write and flush of data into two operations Allow for split write and then flush of outbound data which allows for quicker release of blocking operations where the flush can be done after the caller has been notified of success or a batch of writes could improve performance. Project: http://git-wip-us.apache.org/repos/asf/qpid-jms/repo Commit: http://git-wip-us.apache.org/repos/asf/qpid-jms/commit/7750a1c2 Tree: http://git-wip-us.apache.org/repos/asf/qpid-jms/tree/7750a1c2 Diff: http://git-wip-us.apache.org/repos/asf/qpid-jms/diff/7750a1c2 Branch: refs/heads/master Commit: 7750a1c27589261b197d7b746506e19d8771b145 Parents: 856e2b9 Author: Timothy Bish <tabish...@gmail.com> Authored: Thu Jun 28 15:09:45 2018 -0400 Committer: Timothy Bish <tabish...@gmail.com> Committed: Thu Jun 28 15:09:45 2018 -0400 ---------------------------------------------------------------------- .../jms/provider/amqp/AmqpFixedProducer.java | 4 +- .../qpid/jms/provider/amqp/AmqpProvider.java | 41 ++++++++++++++++++-- .../apache/qpid/jms/transports/Transport.java | 25 ++++++++++-- .../jms/transports/netty/NettyTcpTransport.java | 20 ++++++++-- .../jms/transports/netty/NettyWsTransport.java | 15 ++++++- .../transports/netty/NettyTcpTransportTest.java | 12 +++--- .../transports/netty/NettyWsTransportTest.java | 10 ++--- 7 files changed, 104 insertions(+), 23 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/7750a1c2/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpFixedProducer.java ---------------------------------------------------------------------- diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpFixedProducer.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpFixedProducer.java index b2036fe..0f50b5c 100644 --- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpFixedProducer.java +++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpFixedProducer.java @@ -166,7 +166,7 @@ public class AmqpFixedProducer extends AmqpProducer { // Put it on the wire and let it fail if the connection is broken, if it does // get written then continue on to determine when we should complete it. - if (provider.pumpToProtonTransport(request)) { + if (provider.pumpToProtonTransport(request, false)) { // For presettled messages we can just mark as successful and we are done, but // for any other message we still track it until the remote settles. If the send // was tagged as asynchronous we must mark the original request as complete but @@ -177,6 +177,8 @@ public class AmqpFixedProducer extends AmqpProducer { } else if (envelope.isSendAsync()) { send.getOriginalRequest().onSuccess(); } + + provider.getTransport().flush(); } } http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/7750a1c2/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpProvider.java ---------------------------------------------------------------------- diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpProvider.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpProvider.java index 7153bbe..890ebdb 100644 --- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpProvider.java +++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpProvider.java @@ -116,6 +116,8 @@ public class AmqpProvider implements Provider, TransportListener , AmqpResourceP private static final AtomicInteger PROVIDER_SEQUENCE = new AtomicInteger(); private static final NoOpAsyncResult NOOP_REQUEST = new NoOpAsyncResult(); + private static final int DEFAULT_MAX_WRITE_BYTES_BEFORE_FLUSH = 128 * 1024; + private volatile ProviderListener listener; private volatile AmqpConnection connection; private AmqpSaslAuthenticator authenticator; @@ -131,6 +133,7 @@ public class AmqpProvider implements Provider, TransportListener , AmqpResourceP private int drainTimeout = 60000; private long sessionOutoingWindow = -1; // Use proton default private int maxFrameSize = DEFAULT_MAX_FRAME_SIZE; + private int maxWriteBytesBeforeFlush = DEFAULT_MAX_WRITE_BYTES_BEFORE_FLUSH; private boolean allowNonSecureRedirects; @@ -695,8 +698,9 @@ public class AmqpProvider implements Provider, TransportListener , AmqpResourceP request.onSuccess(); pumpToProtonTransport(request); } else { - pumpToProtonTransport(request); + pumpToProtonTransport(request, false); request.onSuccess(); + transport.flush(); } } catch (Throwable t) { request.onFailure(t); @@ -1037,12 +1041,18 @@ public class AmqpProvider implements Provider, TransportListener , AmqpResourceP } protected boolean pumpToProtonTransport() { - return pumpToProtonTransport(NOOP_REQUEST); + return pumpToProtonTransport(NOOP_REQUEST, true); } protected boolean pumpToProtonTransport(AsyncResult request) { + return pumpToProtonTransport(request, true); + } + + protected boolean pumpToProtonTransport(AsyncResult request, boolean flush) { try { boolean done = false; + int bytesWritten = 0; + while (!done) { ByteBuffer toWrite = protonTransport.getOutputBuffer(); if (toWrite != null && toWrite.hasRemaining()) { @@ -1053,12 +1063,22 @@ public class AmqpProvider implements Provider, TransportListener , AmqpResourceP TRACE_BYTES.info("Sending: {}", ByteBufUtil.hexDump(outbound)); } - transport.send(outbound); + bytesWritten += outbound.readableBytes(); + if (flush && bytesWritten >= getMaxWriteBytesBeforeFlush()) { + transport.flush(); + bytesWritten = 0; + } + + transport.write(outbound); protonTransport.outputConsumed(); } else { done = true; } } + + if (flush && bytesWritten > 0) { + transport.flush(); + } } catch (IOException e) { fireProviderException(e); request.onFailure(e); @@ -1254,6 +1274,21 @@ public class AmqpProvider implements Provider, TransportListener , AmqpResourceP return maxFrameSize; } + public int getMaxWriteBytesBeforeFlush() { + return maxWriteBytesBeforeFlush; + } + + /** + * Sets the maximum number of bytes that will be written on a large set of batched writes + * before a flush is requested on the {@link Transport}. + * + * @param maxWriteBytesBeforeFlush + * number of bytes written before a flush is requested. + */ + public void setMaxWriteBytesBeforeFlush(int maxWriteBytesBeforeFlush) { + this.maxWriteBytesBeforeFlush = maxWriteBytesBeforeFlush; + } + /** * Sets the max frame size (in bytes). * http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/7750a1c2/qpid-jms-client/src/main/java/org/apache/qpid/jms/transports/Transport.java ---------------------------------------------------------------------- diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/transports/Transport.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/transports/Transport.java index b6ad697..5c92b06 100644 --- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/transports/Transport.java +++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/transports/Transport.java @@ -72,14 +72,33 @@ public interface Transport { ByteBuf allocateSendBuffer(int size) throws IOException; /** - * Sends a chunk of data over the Transport connection. + * Writes a chunk of data over the Transport connection without performing an + * explicit flush on the transport. * * @param output * The buffer of data that is to be transmitted. * - * @throws IOException if an error occurs during the send operation. + * @throws IOException if an error occurs during the write operation. */ - void send(ByteBuf output) throws IOException; + void write(ByteBuf output) throws IOException; + + /** + * Writes a chunk of data over the Transport connection and requests a flush of + * all pending queued write operations + * + * @param output + * The buffer of data that is to be transmitted. + * + * @throws IOException if an error occurs during the write operation. + */ + void writeAndFlush(ByteBuf output) throws IOException; + + /** + * Request a flush of all pending writes to the underlying connection. + * + * @throws IOException if an error occurs during the flush operation. + */ + void flush() throws IOException; /** * Gets the currently set TransportListener instance http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/7750a1c2/qpid-jms-client/src/main/java/org/apache/qpid/jms/transports/netty/NettyTcpTransport.java ---------------------------------------------------------------------- diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/transports/netty/NettyTcpTransport.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/transports/netty/NettyTcpTransport.java index a5b7ef7..ab04a75 100644 --- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/transports/netty/NettyTcpTransport.java +++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/transports/netty/NettyTcpTransport.java @@ -262,15 +262,27 @@ public class NettyTcpTransport implements Transport { } @Override - public void send(ByteBuf output) throws IOException { + public void write(ByteBuf output) throws IOException { checkConnected(output); - LOG.trace("Attempted write of: {} bytes", output.readableBytes()); + channel.write(output); + } + @Override + public void writeAndFlush(ByteBuf output) throws IOException { + checkConnected(output); + LOG.trace("Attempted write and flush of: {} bytes", output.readableBytes()); channel.writeAndFlush(output); } @Override + public void flush() throws IOException { + checkConnected(); + LOG.trace("Attempted flush of pending writes"); + channel.flush(); + } + + @Override public TransportListener getTransportListener() { return listener; } @@ -377,13 +389,13 @@ public class NettyTcpTransport implements Transport { //----- State change handlers and checks ---------------------------------// protected final void checkConnected() throws IOException { - if (!connected.get()) { + if (!connected.get() || !channel.isActive()) { throw new IOException("Cannot send to a non-connected transport."); } } private void checkConnected(ByteBuf output) throws IOException { - if (!connected.get()) { + if (!connected.get() || !channel.isActive()) { ReferenceCountUtil.release(output); throw new IOException("Cannot send to a non-connected transport."); } http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/7750a1c2/qpid-jms-client/src/main/java/org/apache/qpid/jms/transports/netty/NettyWsTransport.java ---------------------------------------------------------------------- diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/transports/netty/NettyWsTransport.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/transports/netty/NettyWsTransport.java index 93370a3..034dfc3 100644 --- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/transports/netty/NettyWsTransport.java +++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/transports/netty/NettyWsTransport.java @@ -85,7 +85,7 @@ public class NettyWsTransport extends NettyTcpTransport { } @Override - public void send(ByteBuf output) throws IOException { + public void write(ByteBuf output) throws IOException { checkConnected(); int length = output.readableBytes(); if (length == 0) { @@ -94,6 +94,19 @@ public class NettyWsTransport extends NettyTcpTransport { LOG.trace("Attempted write of: {} bytes", length); + channel.write(new BinaryWebSocketFrame(output)); + } + + @Override + public void writeAndFlush(ByteBuf output) throws IOException { + checkConnected(); + int length = output.readableBytes(); + if (length == 0) { + return; + } + + LOG.trace("Attempted write and flush of: {} bytes", length); + channel.writeAndFlush(new BinaryWebSocketFrame(output)); } http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/7750a1c2/qpid-jms-client/src/test/java/org/apache/qpid/jms/transports/netty/NettyTcpTransportTest.java ---------------------------------------------------------------------- diff --git a/qpid-jms-client/src/test/java/org/apache/qpid/jms/transports/netty/NettyTcpTransportTest.java b/qpid-jms-client/src/test/java/org/apache/qpid/jms/transports/netty/NettyTcpTransportTest.java index 777ffbb..f463971 100644 --- a/qpid-jms-client/src/test/java/org/apache/qpid/jms/transports/netty/NettyTcpTransportTest.java +++ b/qpid-jms-client/src/test/java/org/apache/qpid/jms/transports/netty/NettyTcpTransportTest.java @@ -253,7 +253,7 @@ public class NettyTcpTransportTest extends QpidJmsTestCase { Transport transport = createTransport(serverLocation, testListener, createClientOptions()); try { transport.connect(null); - transport.send(sendBuffer.copy()); + transport.writeAndFlush(sendBuffer.copy()); transports.add(transport); } catch (Exception e) { fail("Should have connected to the server at " + serverLocation + " but got exception: " + e); @@ -334,7 +334,7 @@ public class NettyTcpTransportTest extends QpidJmsTestCase { assertTrue(transport.isConnected()); - transport.send(Unpooled.buffer(0)); + transport.writeAndFlush(Unpooled.buffer(0)); transport.close(); } @@ -367,7 +367,7 @@ public class NettyTcpTransportTest extends QpidJmsTestCase { sendBuffer.writeByte('A'); } - transport.send(sendBuffer); + transport.writeAndFlush(sendBuffer); assertTrue(Wait.waitFor(new Wait.Condition() { @Override @@ -419,7 +419,7 @@ public class NettyTcpTransportTest extends QpidJmsTestCase { } for (int i = 0; i < iterations; ++i) { - transport.send(sendBuffer.copy()); + transport.writeAndFlush(sendBuffer.copy()); } assertTrue(Wait.waitFor(new Wait.Condition() { @@ -460,7 +460,7 @@ public class NettyTcpTransportTest extends QpidJmsTestCase { ByteBuf sendBuffer = Unpooled.buffer(10); try { - transport.send(sendBuffer); + transport.writeAndFlush(sendBuffer); fail("Should throw on send of closed transport"); } catch (IOException ex) { } @@ -497,7 +497,7 @@ public class NettyTcpTransportTest extends QpidJmsTestCase { transport.close(); try { - transport.send(sendBuffer); + transport.writeAndFlush(sendBuffer); fail("Should throw on send of closed transport"); } catch (IOException ex) { } http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/7750a1c2/qpid-jms-client/src/test/java/org/apache/qpid/jms/transports/netty/NettyWsTransportTest.java ---------------------------------------------------------------------- diff --git a/qpid-jms-client/src/test/java/org/apache/qpid/jms/transports/netty/NettyWsTransportTest.java b/qpid-jms-client/src/test/java/org/apache/qpid/jms/transports/netty/NettyWsTransportTest.java index 993d526..390fd73 100644 --- a/qpid-jms-client/src/test/java/org/apache/qpid/jms/transports/netty/NettyWsTransportTest.java +++ b/qpid-jms-client/src/test/java/org/apache/qpid/jms/transports/netty/NettyWsTransportTest.java @@ -151,7 +151,7 @@ public class NettyWsTransportTest extends NettyTcpTransportTest { transport.setMaxFrameSize(FRAME_SIZE); transport.connect(null); transports.add(transport); - transport.send(sendBuffer.copy()); + transport.writeAndFlush(sendBuffer.copy()); } catch (Exception e) { fail("Should have connected to the server at " + serverLocation + " but got exception: " + e); } @@ -202,7 +202,7 @@ public class NettyWsTransportTest extends NettyTcpTransportTest { transport.setMaxFrameSize(FRAME_SIZE); transport.connect(null); transports.add(transport); - transport.send(sendBuffer.copy()); + transport.writeAndFlush(sendBuffer.copy()); } catch (Exception e) { fail("Should have connected to the server at " + serverLocation + " but got exception: " + e); } @@ -263,7 +263,7 @@ public class NettyWsTransportTest extends NettyTcpTransportTest { transport.setMaxFrameSize(FRAME_SIZE / 2); transport.connect(null); transports.add(transport); - transport.send(sendBuffer.copy()); + transport.writeAndFlush(sendBuffer.copy()); } catch (Exception e) { fail("Should have connected to the server at " + serverLocation + " but got exception: " + e); } @@ -299,7 +299,7 @@ public class NettyWsTransportTest extends NettyTcpTransportTest { transport.setMaxFrameSize(FRAME_SIZE); transport.connect(null); transports.add(transport); - transport.send(sendBuffer.copy()); + transport.writeAndFlush(sendBuffer.copy()); } catch (Exception e) { fail("Should have connected to the server at " + serverLocation + " but got exception: " + e); } @@ -308,7 +308,7 @@ public class NettyWsTransportTest extends NettyTcpTransportTest { @Override public boolean isSatisfied() throws Exception { try { - transport.send(sendBuffer); + transport.writeAndFlush(sendBuffer); } catch (IOException e) { LOG.info("Transport send caught error:", e); return true; --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@qpid.apache.org For additional commands, e-mail: commits-h...@qpid.apache.org