This is an automated email from the ASF dual-hosted git repository. mmiller pushed a commit to branch 1.9 in repository https://gitbox.apache.org/repos/asf/accumulo.git
The following commit(s) were added to refs/heads/1.9 by this push: new 987ae65 Use optimized writeV methods in Mutations (#669) 987ae65 is described below commit 987ae65cc606bdaf6865c2c6810a13c767b7a5c5 Author: Mike Miller <mmil...@apache.org> AuthorDate: Tue Oct 2 11:03:05 2018 -0400 Use optimized writeV methods in Mutations (#669) * Moved optimized writeVLong and writeVInt to UnsynchronizedBuffer to replace use of WriteableUtils methods. Each optimized method will only make one write call to the underlying outputstream. --- .../org/apache/accumulo/core/data/Mutation.java | 13 +-- .../accumulo/core/util/UnsynchronizedBuffer.java | 101 +++++++++++++++------ .../core/util/UnsynchronizedBufferTest.java | 72 +++++++++++++++ .../accumulo/server/data/ServerMutation.java | 4 +- 4 files changed, 157 insertions(+), 33 deletions(-) diff --git a/core/src/main/java/org/apache/accumulo/core/data/Mutation.java b/core/src/main/java/org/apache/accumulo/core/data/Mutation.java index 091d50d..338444d 100644 --- a/core/src/main/java/org/apache/accumulo/core/data/Mutation.java +++ b/core/src/main/java/org/apache/accumulo/core/data/Mutation.java @@ -1116,6 +1116,7 @@ public class Mutation implements Writable { @Override public void write(DataOutput out) throws IOException { + final byte[] integerBuffer = new byte[5]; serialize(); byte hasValues = (values == null) ? 0 : (byte) 1; if (!replicationSources.isEmpty()) { @@ -1124,23 +1125,23 @@ public class Mutation implements Writable { } out.write((byte) (0x80 | hasValues)); - WritableUtils.writeVInt(out, row.length); + UnsynchronizedBuffer.writeVInt(out, integerBuffer, row.length); out.write(row); - WritableUtils.writeVInt(out, data.length); + UnsynchronizedBuffer.writeVInt(out, integerBuffer, data.length); out.write(data); - WritableUtils.writeVInt(out, entries); + UnsynchronizedBuffer.writeVInt(out, integerBuffer, entries); if (0x01 == (0x01 & hasValues)) { - WritableUtils.writeVInt(out, values.size()); + UnsynchronizedBuffer.writeVInt(out, integerBuffer, values.size()); for (int i = 0; i < values.size(); i++) { byte val[] = values.get(i); - WritableUtils.writeVInt(out, val.length); + UnsynchronizedBuffer.writeVInt(out, integerBuffer, val.length); out.write(val); } } if (0x02 == (0x02 & hasValues)) { - WritableUtils.writeVInt(out, replicationSources.size()); + UnsynchronizedBuffer.writeVInt(out, integerBuffer, replicationSources.size()); for (String source : replicationSources) { WritableUtils.writeString(out, source); } diff --git a/core/src/main/java/org/apache/accumulo/core/util/UnsynchronizedBuffer.java b/core/src/main/java/org/apache/accumulo/core/util/UnsynchronizedBuffer.java index b2fd932..2993f8b 100644 --- a/core/src/main/java/org/apache/accumulo/core/util/UnsynchronizedBuffer.java +++ b/core/src/main/java/org/apache/accumulo/core/util/UnsynchronizedBuffer.java @@ -16,6 +16,8 @@ */ package org.apache.accumulo.core.util; +import java.io.DataOutput; +import java.io.IOException; import java.nio.ByteBuffer; import org.apache.hadoop.io.WritableUtils; @@ -136,32 +138,7 @@ public class UnsynchronizedBuffer { */ public void writeVLong(long i) { reserve(9); - if (i >= -112 && i <= 127) { - data[offset++] = (byte) i; - return; - } - - int len = -112; - if (i < 0) { - i ^= -1L; // take one's complement' - len = -120; - } - - long tmp = i; - while (tmp != 0) { - tmp = tmp >> 8; - len--; - } - - data[offset++] = (byte) len; - - len = (len < -120) ? -(len + 120) : -(len + 112); - - for (int idx = len; idx != 0; idx--) { - int shiftbits = (idx - 1) * 8; - long mask = 0xFFL << shiftbits; - data[offset++] = (byte) ((i & mask) >> shiftbits); - } + offset = UnsynchronizedBuffer.writeVLong(data, offset, i); } } @@ -304,4 +281,76 @@ public class UnsynchronizedBuffer { return ret; } + + /** + * Use the provided byte[] to buffer only the bytes used to write out the integer i to the + * DataOutput out. This will only ever make one write call to the DataOutput. Use this instead of + * {@link WritableUtils#writeVInt(DataOutput, int)} which could make up to 4 separate writes to + * the underlying OutputStream. Is compatible with WritableUtils as it will write the same data. + */ + public static void writeVInt(DataOutput out, byte[] workBuffer, int i) throws IOException { + int size = UnsynchronizedBuffer.writeVInt(workBuffer, 0, i); + out.write(workBuffer, 0, size); + } + + /** + * Use the provided byte[] to buffer only the bytes used to write out the long i to the DataOutput + * out. This will only ever make one write call to the DataOutput. Use this instead of + * {@link WritableUtils#writeVLong(DataOutput, long)} which could make up to 8 separate writes to + * the underlying OutputStream. Is compatible with WritableUtils as it will write the same data. + */ + public static void writeVLong(DataOutput out, byte[] workBuffer, long i) throws IOException { + int size = UnsynchronizedBuffer.writeVLong(workBuffer, 0, i); + out.write(workBuffer, 0, size); + } + + /** + * Writes a variable int directly to a byte array. Is compatible with {@link WritableUtils} as it + * will write the same data. + */ + public static int writeVInt(byte[] dest, int offset, int i) { + return writeVLong(dest, offset, i); + } + + /** + * Writes a variable long directly to a byte array. Is compatible with {@link WritableUtils} as it + * will write the same data. + * + * @param dest + * The destination array for the long to be written to + * @param offset + * The location where to write the long to + * @param value + * The long value being written into byte array + * @return Returns the new offset location + */ + public static int writeVLong(byte[] dest, int offset, long value) { + if (value >= -112 && value <= 127) { + dest[offset++] = (byte) value; + return offset; + } + + int len = -112; + if (value < 0) { + value ^= -1L; // take one's complement' + len = -120; + } + + long tmp = value; + while (tmp != 0) { + tmp = tmp >> 8; + len--; + } + + dest[offset++] = (byte) len; + + len = (len < -120) ? -(len + 120) : -(len + 112); + + for (int idx = len; idx != 0; idx--) { + int shiftbits = (idx - 1) * 8; + long mask = 0xFFL << shiftbits; + dest[offset++] = (byte) ((value & mask) >> shiftbits); + } + return offset; + } } diff --git a/core/src/test/java/org/apache/accumulo/core/util/UnsynchronizedBufferTest.java b/core/src/test/java/org/apache/accumulo/core/util/UnsynchronizedBufferTest.java index 052d7b6..0038b06 100644 --- a/core/src/test/java/org/apache/accumulo/core/util/UnsynchronizedBufferTest.java +++ b/core/src/test/java/org/apache/accumulo/core/util/UnsynchronizedBufferTest.java @@ -18,9 +18,14 @@ package org.apache.accumulo.core.util; import static java.nio.charset.StandardCharsets.UTF_8; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; +import java.io.ByteArrayOutputStream; +import java.io.DataOutputStream; import java.nio.ByteBuffer; +import java.util.Arrays; +import org.apache.hadoop.io.WritableUtils; import org.junit.Rule; import org.junit.Test; import org.junit.rules.ExpectedException; @@ -53,4 +58,71 @@ public class UnsynchronizedBufferTest { thrown.expect(ArrayIndexOutOfBoundsException.class); ub.readBytes(buf); } + + @Test + public void testWriteVMethods() throws Exception { + // writeV methods use an extra byte for length, unless value is only one byte + // Integer.MAX_VALUE = 0x7fffffff + testInteger(0x7fffffff, 4 + 1); + testInteger(0x7fffff, 3 + 1); + testInteger(0x7fff, 2 + 1); + testInteger(0x7f, 1); + + // Long.MAX_VALUE = 0x7fffffffffffffffL + testLong(0x7fffffffffffffffL, 8 + 1); + testLong(0x7fffffffffffffL, 7 + 1); + testLong(0x7fffffffffffL, 6 + 1); + testLong(0x7fffffffffL, 5 + 1); + testLong(0x7fffffffL, 4 + 1); + testLong(0x7fffffL, 3 + 1); + testLong(0x7fffL, 2 + 1); + testLong(0x7fL, 1); + } + + private void testInteger(int value, int length) throws Exception { + byte[] integerBuffer = new byte[5]; + try (ByteArrayOutputStream baos = new ByteArrayOutputStream(); + DataOutputStream dos = new DataOutputStream(baos)) { + UnsynchronizedBuffer.writeVInt(dos, integerBuffer, value); + dos.flush(); + assertEquals(length, baos.toByteArray().length); + } + } + + private void testLong(long value, int length) throws Exception { + byte[] longBuffer = new byte[9]; + try (ByteArrayOutputStream baos = new ByteArrayOutputStream(); + DataOutputStream dos = new DataOutputStream(baos)) { + UnsynchronizedBuffer.writeVLong(dos, longBuffer, value); + dos.flush(); + assertEquals(length, baos.toByteArray().length); + } + } + + @Test + public void compareWithWritableUtils() throws Exception { + byte[] hadoopBytes; + byte[] accumuloBytes; + int oneByteInt = 0x7f; + int threeByteInt = 0x7fff; + long sixByteLong = 0x7fffffffffL; + try (ByteArrayOutputStream baos = new ByteArrayOutputStream(); + DataOutputStream dos = new DataOutputStream(baos)) { + WritableUtils.writeVInt(dos, oneByteInt); + WritableUtils.writeVInt(dos, threeByteInt); + WritableUtils.writeVLong(dos, sixByteLong); + dos.flush(); + hadoopBytes = baos.toByteArray(); + } + try (ByteArrayOutputStream baos = new ByteArrayOutputStream(); + DataOutputStream dos = new DataOutputStream(baos)) { + UnsynchronizedBuffer.writeVInt(dos, new byte[5], oneByteInt); + UnsynchronizedBuffer.writeVInt(dos, new byte[5], threeByteInt); + UnsynchronizedBuffer.writeVLong(dos, new byte[9], sixByteLong); + dos.flush(); + accumuloBytes = baos.toByteArray(); + } + assertTrue("The byte array written to by UnsynchronizedBuffer is not equal to WritableUtils", + Arrays.equals(hadoopBytes, accumuloBytes)); + } } diff --git a/server/base/src/main/java/org/apache/accumulo/server/data/ServerMutation.java b/server/base/src/main/java/org/apache/accumulo/server/data/ServerMutation.java index a86fd85..9dfd850 100644 --- a/server/base/src/main/java/org/apache/accumulo/server/data/ServerMutation.java +++ b/server/base/src/main/java/org/apache/accumulo/server/data/ServerMutation.java @@ -23,6 +23,7 @@ import java.io.IOException; import org.apache.accumulo.core.data.ColumnUpdate; import org.apache.accumulo.core.data.Mutation; import org.apache.accumulo.core.data.thrift.TMutation; +import org.apache.accumulo.core.util.UnsynchronizedBuffer; import org.apache.hadoop.io.Text; import org.apache.hadoop.io.WritableUtils; @@ -57,8 +58,9 @@ public class ServerMutation extends Mutation { @Override public void write(DataOutput out) throws IOException { + final byte[] timeBuffer = new byte[9]; super.write(out); - WritableUtils.writeVLong(out, systemTime); + UnsynchronizedBuffer.writeVLong(out, timeBuffer, systemTime); } public void setSystemTimestamp(long v) {