This is an automated email from the ASF dual-hosted git repository. jking pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/thrift.git
The following commit(s) were added to refs/heads/master by this push: new 6503043 THRIFT-4714: optimize java TFramedTransport to call write once per flush 6503043 is described below commit 6503043bc42ab96da14c25f3aee2bb4add719774 Author: James E. King III <jk...@apache.org> AuthorDate: Thu Jan 3 00:01:18 2019 -0500 THRIFT-4714: optimize java TFramedTransport to call write once per flush --- .../AutoExpandingBufferWriteTransport.java | 30 ++++++++++++++--- .../thrift/transport/TFastFramedTransport.java | 13 +++++--- .../apache/thrift/transport/TFramedTransport.java | 17 +++++++--- .../TestAutoExpandingBufferWriteTransport.java | 39 ++++++++++++++++++---- .../thrift/transport/TestTFramedTransport.java | 4 +-- 5 files changed, 81 insertions(+), 22 deletions(-) diff --git a/lib/java/src/org/apache/thrift/transport/AutoExpandingBufferWriteTransport.java b/lib/java/src/org/apache/thrift/transport/AutoExpandingBufferWriteTransport.java index ad2ec55..ec7e7d4 100644 --- a/lib/java/src/org/apache/thrift/transport/AutoExpandingBufferWriteTransport.java +++ b/lib/java/src/org/apache/thrift/transport/AutoExpandingBufferWriteTransport.java @@ -25,10 +25,29 @@ public final class AutoExpandingBufferWriteTransport extends TTransport { private final AutoExpandingBuffer buf; private int pos; + private int res; - public AutoExpandingBufferWriteTransport(int initialCapacity) { + /** + * Constructor. + * @param initialCapacity the initial capacity of the buffer + * @param frontReserve space, if any, to reserve at the beginning such + * that the first write is after this reserve. + * This allows framed transport to reserve space + * for the frame buffer length. + * @throws IllegalArgumentException if initialCapacity is less than one + * @throws IllegalArgumentException if frontReserve is less than zero + * @throws IllegalArgumentException if frontReserve is greater than initialCapacity + */ + public AutoExpandingBufferWriteTransport(int initialCapacity, int frontReserve) { + if (initialCapacity < 1) { + throw new IllegalArgumentException("initialCapacity"); + } + if (frontReserve < 0 || initialCapacity < frontReserve) { + throw new IllegalArgumentException("frontReserve"); + } this.buf = new AutoExpandingBuffer(initialCapacity); - this.pos = 0; + this.pos = frontReserve; + this.res = frontReserve; } @Override @@ -56,11 +75,14 @@ public final class AutoExpandingBufferWriteTransport extends TTransport { return buf; } - public int getPos() { + /** + * @return length of the buffer, including any front reserve + */ + public int getLength() { return pos; } public void reset() { - pos = 0; + pos = res; } } diff --git a/lib/java/src/org/apache/thrift/transport/TFastFramedTransport.java b/lib/java/src/org/apache/thrift/transport/TFastFramedTransport.java index 891d798..a1fd249 100644 --- a/lib/java/src/org/apache/thrift/transport/TFastFramedTransport.java +++ b/lib/java/src/org/apache/thrift/transport/TFastFramedTransport.java @@ -106,8 +106,8 @@ public class TFastFramedTransport extends TTransport { this.underlying = underlying; this.maxLength = maxLength; this.initialBufferCapacity = initialBufferCapacity; - writeBuffer = new AutoExpandingBufferWriteTransport(initialBufferCapacity); readBuffer = new AutoExpandingBufferReadTransport(initialBufferCapacity); + writeBuffer = new AutoExpandingBufferWriteTransport(initialBufferCapacity, 4); } @Override @@ -166,16 +166,19 @@ public class TFastFramedTransport extends TTransport { readBuffer.consumeBuffer(len); } + /** + * Only clears the read buffer! + */ public void clear() { readBuffer = new AutoExpandingBufferReadTransport(initialBufferCapacity); } @Override public void flush() throws TTransportException { - int length = writeBuffer.getPos(); - TFramedTransport.encodeFrameSize(length, i32buf); - underlying.write(i32buf, 0, 4); - underlying.write(writeBuffer.getBuf().array(), 0, length); + int payloadLength = writeBuffer.getLength() - 4; + byte[] data = writeBuffer.getBuf().array(); + TFramedTransport.encodeFrameSize(payloadLength, data); + underlying.write(data, 0, payloadLength + 4); writeBuffer.reset(); underlying.flush(); } diff --git a/lib/java/src/org/apache/thrift/transport/TFramedTransport.java b/lib/java/src/org/apache/thrift/transport/TFramedTransport.java index fa531ef..a006c3a 100644 --- a/lib/java/src/org/apache/thrift/transport/TFramedTransport.java +++ b/lib/java/src/org/apache/thrift/transport/TFramedTransport.java @@ -66,16 +66,25 @@ public class TFramedTransport extends TTransport { } /** + * Something to fill in the first four bytes of the buffer + * to make room for the frame size. This allows the + * implementation to write once instead of twice. + */ + private static final byte[] sizeFiller_ = new byte[] { 0x00, 0x00, 0x00, 0x00 }; + + /** * Constructor wraps around another transport */ public TFramedTransport(TTransport transport, int maxLength) { transport_ = transport; maxLength_ = maxLength; + writeBuffer_.write(sizeFiller_, 0, 4); } public TFramedTransport(TTransport transport) { transport_ = transport; maxLength_ = TFramedTransport.DEFAULT_MAX_LENGTH; + writeBuffer_.write(sizeFiller_, 0, 4); } public void open() throws TTransportException { @@ -155,12 +164,12 @@ public class TFramedTransport extends TTransport { @Override public void flush() throws TTransportException { byte[] buf = writeBuffer_.get(); - int len = writeBuffer_.len(); + int len = writeBuffer_.len() - 4; // account for the prepended frame size writeBuffer_.reset(); + writeBuffer_.write(sizeFiller_, 0, 4); // make room for the next frame's size data - encodeFrameSize(len, i32buf); - transport_.write(i32buf, 0, 4); - transport_.write(buf, 0, len); + encodeFrameSize(len, buf); // this is the frame length without the filler + transport_.write(buf, 0, len + 4); // we have to write the frame size and frame data transport_.flush(); } diff --git a/lib/java/test/org/apache/thrift/transport/TestAutoExpandingBufferWriteTransport.java b/lib/java/test/org/apache/thrift/transport/TestAutoExpandingBufferWriteTransport.java index 6b04feb..86b5b0d 100644 --- a/lib/java/test/org/apache/thrift/transport/TestAutoExpandingBufferWriteTransport.java +++ b/lib/java/test/org/apache/thrift/transport/TestAutoExpandingBufferWriteTransport.java @@ -19,26 +19,51 @@ package org.apache.thrift.transport; import java.nio.ByteBuffer; +import org.junit.Test; +import static org.junit.Assert.*; -import junit.framework.TestCase; - -public class TestAutoExpandingBufferWriteTransport extends TestCase { +public class TestAutoExpandingBufferWriteTransport { + @Test public void testIt() throws Exception { - AutoExpandingBufferWriteTransport t = new AutoExpandingBufferWriteTransport(1); + AutoExpandingBufferWriteTransport t = new AutoExpandingBufferWriteTransport(1, 0); + assertEquals(0, t.getLength()); assertEquals(1, t.getBuf().array().length); byte[] b1 = new byte[]{1,2,3}; t.write(b1); - assertEquals(3, t.getPos()); + assertEquals(3, t.getLength()); assertTrue(t.getBuf().array().length >= 3); assertEquals(ByteBuffer.wrap(b1), ByteBuffer.wrap(t.getBuf().array(), 0, 3)); t.reset(); + assertEquals(0, t.getLength()); assertTrue(t.getBuf().array().length >= 3); - assertEquals(0, t.getPos()); byte[] b2 = new byte[]{4,5}; t.write(b2); - assertEquals(2, t.getPos()); + assertEquals(2, t.getLength()); assertEquals(ByteBuffer.wrap(b2), ByteBuffer.wrap(t.getBuf().array(), 0, 2)); + + AutoExpandingBufferWriteTransport uut = new AutoExpandingBufferWriteTransport(8, 4); + assertEquals(4, uut.getLength()); + assertEquals(8, uut.getBuf().array().length); + uut.write(b1); + assertEquals(7, uut.getLength()); + assertEquals(8, uut.getBuf().array().length); + assertEquals(ByteBuffer.wrap(b1), ByteBuffer.wrap(uut.getBuf().array(), 4, 3)); + } + + @Test(expected = IllegalArgumentException.class) + public void testBadInitialSize() throws IllegalArgumentException { + new AutoExpandingBufferWriteTransport(0, 0); + } + + @Test(expected = IllegalArgumentException.class) + public void testBadFrontReserveSize() throws IllegalArgumentException { + new AutoExpandingBufferWriteTransport(4, -1); + } + + @Test(expected = IllegalArgumentException.class) + public void testTooSmallFrontReserveSize() throws IllegalArgumentException { + new AutoExpandingBufferWriteTransport(4, 5); } } diff --git a/lib/java/test/org/apache/thrift/transport/TestTFramedTransport.java b/lib/java/test/org/apache/thrift/transport/TestTFramedTransport.java index 7e889d6..e30d74b 100644 --- a/lib/java/test/org/apache/thrift/transport/TestTFramedTransport.java +++ b/lib/java/test/org/apache/thrift/transport/TestTFramedTransport.java @@ -125,11 +125,11 @@ public class TestTFramedTransport extends TestCase { assertEquals(0, countingTrans.writeCount); trans.flush(); - assertEquals(2, countingTrans.writeCount); + assertEquals(1, countingTrans.writeCount); trans.write(byteSequence(0, 245)); trans.flush(); - assertEquals(4, countingTrans.writeCount); + assertEquals(2, countingTrans.writeCount); DataInputStream din = new DataInputStream(new ByteArrayInputStream(baos.toByteArray())); assertEquals(256, din.readInt());