This is an automated email from the ASF dual-hosted git repository.

mmiller pushed a commit to branch 1.9
in repository https://gitbox.apache.org/repos/asf/accumulo.git


The following commit(s) were added to refs/heads/1.9 by this push:
     new 987ae65  Use optimized writeV methods in Mutations (#669)
987ae65 is described below

commit 987ae65cc606bdaf6865c2c6810a13c767b7a5c5
Author: Mike Miller <mmil...@apache.org>
AuthorDate: Tue Oct 2 11:03:05 2018 -0400

    Use optimized writeV methods in Mutations (#669)
    
    * Moved optimized writeVLong and writeVInt to UnsynchronizedBuffer
    to replace use of WriteableUtils methods. Each optimized method will
    only make one write call to the underlying outputstream.
---
 .../org/apache/accumulo/core/data/Mutation.java    |  13 +--
 .../accumulo/core/util/UnsynchronizedBuffer.java   | 101 +++++++++++++++------
 .../core/util/UnsynchronizedBufferTest.java        |  72 +++++++++++++++
 .../accumulo/server/data/ServerMutation.java       |   4 +-
 4 files changed, 157 insertions(+), 33 deletions(-)

diff --git a/core/src/main/java/org/apache/accumulo/core/data/Mutation.java 
b/core/src/main/java/org/apache/accumulo/core/data/Mutation.java
index 091d50d..338444d 100644
--- a/core/src/main/java/org/apache/accumulo/core/data/Mutation.java
+++ b/core/src/main/java/org/apache/accumulo/core/data/Mutation.java
@@ -1116,6 +1116,7 @@ public class Mutation implements Writable {
 
   @Override
   public void write(DataOutput out) throws IOException {
+    final byte[] integerBuffer = new byte[5];
     serialize();
     byte hasValues = (values == null) ? 0 : (byte) 1;
     if (!replicationSources.isEmpty()) {
@@ -1124,23 +1125,23 @@ public class Mutation implements Writable {
     }
     out.write((byte) (0x80 | hasValues));
 
-    WritableUtils.writeVInt(out, row.length);
+    UnsynchronizedBuffer.writeVInt(out, integerBuffer, row.length);
     out.write(row);
 
-    WritableUtils.writeVInt(out, data.length);
+    UnsynchronizedBuffer.writeVInt(out, integerBuffer, data.length);
     out.write(data);
-    WritableUtils.writeVInt(out, entries);
+    UnsynchronizedBuffer.writeVInt(out, integerBuffer, entries);
 
     if (0x01 == (0x01 & hasValues)) {
-      WritableUtils.writeVInt(out, values.size());
+      UnsynchronizedBuffer.writeVInt(out, integerBuffer, values.size());
       for (int i = 0; i < values.size(); i++) {
         byte val[] = values.get(i);
-        WritableUtils.writeVInt(out, val.length);
+        UnsynchronizedBuffer.writeVInt(out, integerBuffer, val.length);
         out.write(val);
       }
     }
     if (0x02 == (0x02 & hasValues)) {
-      WritableUtils.writeVInt(out, replicationSources.size());
+      UnsynchronizedBuffer.writeVInt(out, integerBuffer, 
replicationSources.size());
       for (String source : replicationSources) {
         WritableUtils.writeString(out, source);
       }
diff --git 
a/core/src/main/java/org/apache/accumulo/core/util/UnsynchronizedBuffer.java 
b/core/src/main/java/org/apache/accumulo/core/util/UnsynchronizedBuffer.java
index b2fd932..2993f8b 100644
--- a/core/src/main/java/org/apache/accumulo/core/util/UnsynchronizedBuffer.java
+++ b/core/src/main/java/org/apache/accumulo/core/util/UnsynchronizedBuffer.java
@@ -16,6 +16,8 @@
  */
 package org.apache.accumulo.core.util;
 
+import java.io.DataOutput;
+import java.io.IOException;
 import java.nio.ByteBuffer;
 
 import org.apache.hadoop.io.WritableUtils;
@@ -136,32 +138,7 @@ public class UnsynchronizedBuffer {
      */
     public void writeVLong(long i) {
       reserve(9);
-      if (i >= -112 && i <= 127) {
-        data[offset++] = (byte) i;
-        return;
-      }
-
-      int len = -112;
-      if (i < 0) {
-        i ^= -1L; // take one's complement'
-        len = -120;
-      }
-
-      long tmp = i;
-      while (tmp != 0) {
-        tmp = tmp >> 8;
-        len--;
-      }
-
-      data[offset++] = (byte) len;
-
-      len = (len < -120) ? -(len + 120) : -(len + 112);
-
-      for (int idx = len; idx != 0; idx--) {
-        int shiftbits = (idx - 1) * 8;
-        long mask = 0xFFL << shiftbits;
-        data[offset++] = (byte) ((i & mask) >> shiftbits);
-      }
+      offset = UnsynchronizedBuffer.writeVLong(data, offset, i);
     }
   }
 
@@ -304,4 +281,76 @@ public class UnsynchronizedBuffer {
 
     return ret;
   }
+
+  /**
+   * Use the provided byte[] to buffer only the bytes used to write out the 
integer i to the
+   * DataOutput out. This will only ever make one write call to the 
DataOutput. Use this instead of
+   * {@link WritableUtils#writeVInt(DataOutput, int)} which could make up to 4 
separate writes to
+   * the underlying OutputStream. Is compatible with WritableUtils as it will 
write the same data.
+   */
+  public static void writeVInt(DataOutput out, byte[] workBuffer, int i) 
throws IOException {
+    int size = UnsynchronizedBuffer.writeVInt(workBuffer, 0, i);
+    out.write(workBuffer, 0, size);
+  }
+
+  /**
+   * Use the provided byte[] to buffer only the bytes used to write out the 
long i to the DataOutput
+   * out. This will only ever make one write call to the DataOutput. Use this 
instead of
+   * {@link WritableUtils#writeVLong(DataOutput, long)} which could make up to 
8 separate writes to
+   * the underlying OutputStream. Is compatible with WritableUtils as it will 
write the same data.
+   */
+  public static void writeVLong(DataOutput out, byte[] workBuffer, long i) 
throws IOException {
+    int size = UnsynchronizedBuffer.writeVLong(workBuffer, 0, i);
+    out.write(workBuffer, 0, size);
+  }
+
+  /**
+   * Writes a variable int directly to a byte array. Is compatible with {@link 
WritableUtils} as it
+   * will write the same data.
+   */
+  public static int writeVInt(byte[] dest, int offset, int i) {
+    return writeVLong(dest, offset, i);
+  }
+
+  /**
+   * Writes a variable long directly to a byte array. Is compatible with 
{@link WritableUtils} as it
+   * will write the same data.
+   *
+   * @param dest
+   *          The destination array for the long to be written to
+   * @param offset
+   *          The location where to write the long to
+   * @param value
+   *          The long value being written into byte array
+   * @return Returns the new offset location
+   */
+  public static int writeVLong(byte[] dest, int offset, long value) {
+    if (value >= -112 && value <= 127) {
+      dest[offset++] = (byte) value;
+      return offset;
+    }
+
+    int len = -112;
+    if (value < 0) {
+      value ^= -1L; // take one's complement'
+      len = -120;
+    }
+
+    long tmp = value;
+    while (tmp != 0) {
+      tmp = tmp >> 8;
+      len--;
+    }
+
+    dest[offset++] = (byte) len;
+
+    len = (len < -120) ? -(len + 120) : -(len + 112);
+
+    for (int idx = len; idx != 0; idx--) {
+      int shiftbits = (idx - 1) * 8;
+      long mask = 0xFFL << shiftbits;
+      dest[offset++] = (byte) ((value & mask) >> shiftbits);
+    }
+    return offset;
+  }
 }
diff --git 
a/core/src/test/java/org/apache/accumulo/core/util/UnsynchronizedBufferTest.java
 
b/core/src/test/java/org/apache/accumulo/core/util/UnsynchronizedBufferTest.java
index 052d7b6..0038b06 100644
--- 
a/core/src/test/java/org/apache/accumulo/core/util/UnsynchronizedBufferTest.java
+++ 
b/core/src/test/java/org/apache/accumulo/core/util/UnsynchronizedBufferTest.java
@@ -18,9 +18,14 @@ package org.apache.accumulo.core.util;
 
 import static java.nio.charset.StandardCharsets.UTF_8;
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
 
+import java.io.ByteArrayOutputStream;
+import java.io.DataOutputStream;
 import java.nio.ByteBuffer;
+import java.util.Arrays;
 
+import org.apache.hadoop.io.WritableUtils;
 import org.junit.Rule;
 import org.junit.Test;
 import org.junit.rules.ExpectedException;
@@ -53,4 +58,71 @@ public class UnsynchronizedBufferTest {
     thrown.expect(ArrayIndexOutOfBoundsException.class);
     ub.readBytes(buf);
   }
+
+  @Test
+  public void testWriteVMethods() throws Exception {
+    // writeV methods use an extra byte for length, unless value is only one 
byte
+    // Integer.MAX_VALUE = 0x7fffffff
+    testInteger(0x7fffffff, 4 + 1);
+    testInteger(0x7fffff, 3 + 1);
+    testInteger(0x7fff, 2 + 1);
+    testInteger(0x7f, 1);
+
+    // Long.MAX_VALUE = 0x7fffffffffffffffL
+    testLong(0x7fffffffffffffffL, 8 + 1);
+    testLong(0x7fffffffffffffL, 7 + 1);
+    testLong(0x7fffffffffffL, 6 + 1);
+    testLong(0x7fffffffffL, 5 + 1);
+    testLong(0x7fffffffL, 4 + 1);
+    testLong(0x7fffffL, 3 + 1);
+    testLong(0x7fffL, 2 + 1);
+    testLong(0x7fL, 1);
+  }
+
+  private void testInteger(int value, int length) throws Exception {
+    byte[] integerBuffer = new byte[5];
+    try (ByteArrayOutputStream baos = new ByteArrayOutputStream();
+        DataOutputStream dos = new DataOutputStream(baos)) {
+      UnsynchronizedBuffer.writeVInt(dos, integerBuffer, value);
+      dos.flush();
+      assertEquals(length, baos.toByteArray().length);
+    }
+  }
+
+  private void testLong(long value, int length) throws Exception {
+    byte[] longBuffer = new byte[9];
+    try (ByteArrayOutputStream baos = new ByteArrayOutputStream();
+        DataOutputStream dos = new DataOutputStream(baos)) {
+      UnsynchronizedBuffer.writeVLong(dos, longBuffer, value);
+      dos.flush();
+      assertEquals(length, baos.toByteArray().length);
+    }
+  }
+
+  @Test
+  public void compareWithWritableUtils() throws Exception {
+    byte[] hadoopBytes;
+    byte[] accumuloBytes;
+    int oneByteInt = 0x7f;
+    int threeByteInt = 0x7fff;
+    long sixByteLong = 0x7fffffffffL;
+    try (ByteArrayOutputStream baos = new ByteArrayOutputStream();
+        DataOutputStream dos = new DataOutputStream(baos)) {
+      WritableUtils.writeVInt(dos, oneByteInt);
+      WritableUtils.writeVInt(dos, threeByteInt);
+      WritableUtils.writeVLong(dos, sixByteLong);
+      dos.flush();
+      hadoopBytes = baos.toByteArray();
+    }
+    try (ByteArrayOutputStream baos = new ByteArrayOutputStream();
+        DataOutputStream dos = new DataOutputStream(baos)) {
+      UnsynchronizedBuffer.writeVInt(dos, new byte[5], oneByteInt);
+      UnsynchronizedBuffer.writeVInt(dos, new byte[5], threeByteInt);
+      UnsynchronizedBuffer.writeVLong(dos, new byte[9], sixByteLong);
+      dos.flush();
+      accumuloBytes = baos.toByteArray();
+    }
+    assertTrue("The byte array written to by UnsynchronizedBuffer is not equal 
to WritableUtils",
+        Arrays.equals(hadoopBytes, accumuloBytes));
+  }
 }
diff --git 
a/server/base/src/main/java/org/apache/accumulo/server/data/ServerMutation.java 
b/server/base/src/main/java/org/apache/accumulo/server/data/ServerMutation.java
index a86fd85..9dfd850 100644
--- 
a/server/base/src/main/java/org/apache/accumulo/server/data/ServerMutation.java
+++ 
b/server/base/src/main/java/org/apache/accumulo/server/data/ServerMutation.java
@@ -23,6 +23,7 @@ import java.io.IOException;
 import org.apache.accumulo.core.data.ColumnUpdate;
 import org.apache.accumulo.core.data.Mutation;
 import org.apache.accumulo.core.data.thrift.TMutation;
+import org.apache.accumulo.core.util.UnsynchronizedBuffer;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.io.WritableUtils;
 
@@ -57,8 +58,9 @@ public class ServerMutation extends Mutation {
 
   @Override
   public void write(DataOutput out) throws IOException {
+    final byte[] timeBuffer = new byte[9];
     super.write(out);
-    WritableUtils.writeVLong(out, systemTime);
+    UnsynchronizedBuffer.writeVLong(out, timeBuffer, systemTime);
   }
 
   public void setSystemTimestamp(long v) {

Reply via email to