This is an automated email from the ASF dual-hosted git repository.
damondouglas pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new 4f09e62f520 Improve varint encoding throughput with unrolled loop
(#29689)
4f09e62f520 is described below
commit 4f09e62f520d8a3bd40f9907e7343814cbd239ae
Author: Steven van Rossum <[email protected]>
AuthorDate: Wed Jan 31 23:23:50 2024 +0100
Improve varint encoding throughput with unrolled loop (#29689)
* Improve varint encoding throughput with unrolled loop
* Change BlackHole to Blackhole
* Add single byte encode tests
* Add missing L
* Remove public modifier
* Remove unused fields
---
.../apache/beam/sdk/jmh/util/VarIntBenchmark.java | 303 +++++++++++++++++++++
.../main/java/org/apache/beam/sdk/util/VarInt.java | 62 ++++-
2 files changed, 358 insertions(+), 7 deletions(-)
diff --git
a/sdks/java/core/jmh/src/main/java/org/apache/beam/sdk/jmh/util/VarIntBenchmark.java
b/sdks/java/core/jmh/src/main/java/org/apache/beam/sdk/jmh/util/VarIntBenchmark.java
new file mode 100644
index 00000000000..964928d5055
--- /dev/null
+++
b/sdks/java/core/jmh/src/main/java/org/apache/beam/sdk/jmh/util/VarIntBenchmark.java
@@ -0,0 +1,303 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.jmh.util;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.util.Random;
+import org.apache.beam.sdk.util.ByteStringOutputStream;
+import org.openjdk.jmh.annotations.Benchmark;
+import org.openjdk.jmh.annotations.Level;
+import org.openjdk.jmh.annotations.OperationsPerInvocation;
+import org.openjdk.jmh.annotations.Scope;
+import org.openjdk.jmh.annotations.Setup;
+import org.openjdk.jmh.annotations.State;
+import org.openjdk.jmh.annotations.TearDown;
+import org.openjdk.jmh.infra.Blackhole;
+
+/** Benchmarks for {@link org.apache.beam.sdk.util.VarInt} and variants. */
+@OperationsPerInvocation(VarIntBenchmark.VALUES_PER_INVOCATION)
+public class VarIntBenchmark {
+ static final int VALUES_PER_INVOCATION = 2048;
+ private static final Random RNG = new Random(314159);
+
+ /** Output to {@link Blackhole}. Do nothing, assume nothing. */
+ @State(Scope.Benchmark)
+ public static class BlackholeOutput {
+ OutputStream stream;
+
+ @Setup
+ public void setup(Blackhole bh) {
+ stream =
+ new OutputStream() {
+ @Override
+ public void write(int b) {
+ bh.consume(b);
+ }
+
+ @Override
+ public void write(byte[] b) throws IOException {
+ bh.consume(b);
+ }
+
+ @Override
+ public void write(byte[] b, int off, int len) throws IOException {
+ bh.consume(b);
+ }
+ };
+ }
+ }
+
+ /** Output to {@link ByteStringOutputStream}. */
+ @State(Scope.Thread)
+ public static class ByteStringOutput {
+ final ByteStringOutputStream stream = new ByteStringOutputStream();
+
+ // Unfortunately, this needs to be cleaned up after use to avoid OOMs.
+ // It's not generally recommended to use Level.Invocation, but there's no
way around it.
+ @TearDown(Level.Invocation)
+ public void tearDown(Blackhole bh) {
+ bh.consume(stream.toByteStringAndReset());
+ }
+ }
+
+ /** Input from randomly generated bytes. */
+ @State(Scope.Benchmark)
+ public static class Bytes {
+ long[] values = new long[VALUES_PER_INVOCATION];
+
+ @Setup
+ public void setup() {
+ values = new long[VALUES_PER_INVOCATION];
+ byte[] bytes = new byte[VALUES_PER_INVOCATION];
+ RNG.nextBytes(bytes);
+
+ for (int i = 0; i < VALUES_PER_INVOCATION; i++) {
+ values[i] = (long) (bytes[i] & 0x7F);
+ }
+ }
+ }
+
+ /** Input from randomly generated longs. */
+ @State(Scope.Benchmark)
+ public static class Longs {
+ long[] values = new long[VALUES_PER_INVOCATION];
+
+ @Setup
+ public void setup() {
+ values = new long[VALUES_PER_INVOCATION];
+
+ for (int i = 0; i < VALUES_PER_INVOCATION; i++) {
+ // This gaussian random is used to determine the encoded output size
of the sample.
+ // The distribution value is tweaked to favor small integers, positive
more so than
+ // negative.
+ double g = RNG.nextGaussian();
+ double s = 3;
+ g = 10 * Math.min(Math.abs(g < 0 ? g + s : g / (s / 2)), s) / s;
+
+ // Construct a bitmask to keep up to numBits of the input.
+ // Find the lowest bit to set in the 7 bit segment below numBits.
+ int numBits = 7 * (int) g;
+ long mask = ~(~0x7fL << numBits);
+ long low = 1L << numBits;
+
+ values[i] = (RNG.nextLong() & mask) | low;
+ }
+ }
+ }
+
+ // Used in Beam 2.52.0
+ static void encodeDoLoop(long v, OutputStream stream) throws IOException {
+ do {
+ // Encode next 7 bits + terminator bit
+ long bits = v & 0x7F;
+ v >>>= 7;
+ byte b = (byte) (bits | ((v != 0) ? 0x80 : 0));
+ stream.write(b);
+ } while (v != 0);
+ }
+
+ // A tweak of the above, replacing a compare with a few bitwise operations.
+ static void encodeDoLoopTwiddle(long v, OutputStream stream) throws
IOException {
+ do {
+ // Encode next 7 bits + terminator bit
+ long bits = v & 0x7F;
+ v >>>= 7;
+ long cont = (-v >> 63) & 0x80;
+ byte b = (byte) (bits | cont);
+ stream.write(b);
+ } while (v != 0);
+ }
+
+ // Use a mask check to do less work for 1 byte output.
+ static void encodeLoop(long v, OutputStream stream) throws IOException {
+ while ((v & ~0x7FL) != 0) {
+ stream.write((byte) (v | 0x80));
+ v >>>= 7;
+ }
+ stream.write((byte) v);
+ }
+
+ // As above, but unrolled.
+ static void encodeUnrolled(long v, OutputStream stream) throws IOException {
+ if ((v & ~0x7F) == 0) {
+ stream.write((byte) v);
+ return;
+ }
+ stream.write((byte) (v | 0x80));
+ v >>>= 7;
+ if ((v & ~0x7F) == 0) {
+ stream.write((byte) v);
+ return;
+ }
+ stream.write((byte) (v | 0x80));
+ v >>>= 7;
+ if ((v & ~0x7F) == 0) {
+ stream.write((byte) v);
+ return;
+ }
+ stream.write((byte) (v | 0x80));
+ v >>>= 7;
+ if ((v & ~0x7F) == 0) {
+ stream.write((byte) v);
+ return;
+ }
+ stream.write((byte) (v | 0x80));
+ v >>>= 7;
+ if ((v & ~0x7F) == 0) {
+ stream.write((byte) v);
+ return;
+ }
+ stream.write((byte) (v | 0x80));
+ v >>>= 7;
+ if ((v & ~0x7F) == 0) {
+ stream.write((byte) v);
+ return;
+ }
+ stream.write((byte) (v | 0x80));
+ v >>>= 7;
+ if ((v & ~0x7F) == 0) {
+ stream.write((byte) v);
+ return;
+ }
+ stream.write((byte) (v | 0x80));
+ v >>>= 7;
+ if ((v & ~0x7F) == 0) {
+ stream.write((byte) v);
+ return;
+ }
+ stream.write((byte) (v | 0x80));
+ v >>>= 7;
+ if ((v & ~0x7F) == 0) {
+ stream.write((byte) v);
+ return;
+ }
+ stream.write((byte) (v | 0x80));
+ v >>>= 7;
+ stream.write((byte) (v));
+ }
+
+ @Benchmark
+ public void encodeDoLoopBlackhole(Longs input, BlackholeOutput output)
throws IOException {
+ for (long l : input.values) {
+ encodeDoLoop(l, output.stream);
+ }
+ }
+
+ @Benchmark
+ public void encodeDoLoopByteString(Longs input, ByteStringOutput output)
throws IOException {
+ for (long l : input.values) {
+ encodeDoLoop(l, output.stream);
+ }
+ }
+
+ @Benchmark
+ public void encodeDoLoopTwiddleBlackhole(Longs input, BlackholeOutput
output) throws IOException {
+ for (long l : input.values) {
+ encodeDoLoopTwiddle(l, output.stream);
+ }
+ }
+
+ @Benchmark
+ public void encodeDoLoopTwiddleByteString(Longs input, ByteStringOutput
output)
+ throws IOException {
+ for (long l : input.values) {
+ encodeDoLoopTwiddle(l, output.stream);
+ }
+ }
+
+ @Benchmark
+ public void encodeLoopBlackhole(Longs input, BlackholeOutput output) throws
IOException {
+ for (long l : input.values) {
+ encodeLoop(l, output.stream);
+ }
+ }
+
+ @Benchmark
+ public void encodeLoopByteString(Longs input, ByteStringOutput output)
throws IOException {
+ for (long l : input.values) {
+ encodeLoop(l, output.stream);
+ }
+ }
+
+ @Benchmark
+ public void encodeUnrolledBlackhole(Longs input, BlackholeOutput output)
throws IOException {
+ for (long l : input.values) {
+ encodeUnrolled(l, output.stream);
+ }
+ }
+
+ @Benchmark
+ public void encodeUnrolledByteString(Longs input, ByteStringOutput output)
throws IOException {
+ for (long l : input.values) {
+ encodeUnrolled(l, output.stream);
+ }
+ }
+
+ @Benchmark
+ public void singleByteEncodeDoLoopByteString(Bytes input, ByteStringOutput
output)
+ throws IOException {
+ for (long l : input.values) {
+ encodeDoLoop(l, output.stream);
+ }
+ }
+
+ @Benchmark
+ public void singleByteEncodeDoLoopTwiddleByteString(Bytes input,
ByteStringOutput output)
+ throws IOException {
+ for (long l : input.values) {
+ encodeDoLoopTwiddle(l, output.stream);
+ }
+ }
+
+ @Benchmark
+ public void singleByteEncodeLoopByteString(Bytes input, ByteStringOutput
output)
+ throws IOException {
+ for (long l : input.values) {
+ encodeLoop(l, output.stream);
+ }
+ }
+
+ @Benchmark
+ public void singleByteEncodeUnrolledByteString(Bytes input, ByteStringOutput
output)
+ throws IOException {
+ for (long l : input.values) {
+ encodeUnrolled(l, output.stream);
+ }
+ }
+}
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/VarInt.java
b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/VarInt.java
index 239cbafc174..5432383f5ad 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/VarInt.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/VarInt.java
@@ -42,13 +42,61 @@ public class VarInt {
/** Encodes the given value onto the stream. */
public static void encode(long v, OutputStream stream) throws IOException {
- do {
- // Encode next 7 bits + terminator bit
- long bits = v & 0x7F;
- v >>>= 7;
- byte b = (byte) (bits | ((v != 0) ? 0x80 : 0));
- stream.write(b);
- } while (v != 0);
+ if ((v & ~0x7F) == 0) {
+ stream.write((byte) v);
+ return;
+ }
+ stream.write((byte) (v | 0x80));
+ v >>>= 7;
+ if ((v & ~0x7F) == 0) {
+ stream.write((byte) v);
+ return;
+ }
+ stream.write((byte) (v | 0x80));
+ v >>>= 7;
+ if ((v & ~0x7F) == 0) {
+ stream.write((byte) v);
+ return;
+ }
+ stream.write((byte) (v | 0x80));
+ v >>>= 7;
+ if ((v & ~0x7F) == 0) {
+ stream.write((byte) v);
+ return;
+ }
+ stream.write((byte) (v | 0x80));
+ v >>>= 7;
+ if ((v & ~0x7F) == 0) {
+ stream.write((byte) v);
+ return;
+ }
+ stream.write((byte) (v | 0x80));
+ v >>>= 7;
+ if ((v & ~0x7F) == 0) {
+ stream.write((byte) v);
+ return;
+ }
+ stream.write((byte) (v | 0x80));
+ v >>>= 7;
+ if ((v & ~0x7F) == 0) {
+ stream.write((byte) v);
+ return;
+ }
+ stream.write((byte) (v | 0x80));
+ v >>>= 7;
+ if ((v & ~0x7F) == 0) {
+ stream.write((byte) v);
+ return;
+ }
+ stream.write((byte) (v | 0x80));
+ v >>>= 7;
+ if ((v & ~0x7F) == 0) {
+ stream.write((byte) v);
+ return;
+ }
+ stream.write((byte) (v | 0x80));
+ v >>>= 7;
+ stream.write((byte) (v));
}
/** Decodes an integer value from the given stream. */