This is an automated email from the ASF dual-hosted git repository.
ijuma pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new aecd47b3bc6 KAFKA-13900 Support Java 9 direct ByteBuffer Checksum
methods (#12163)
aecd47b3bc6 is described below
commit aecd47b3bc691c3158463dab2c698a3ba9eb2f98
Author: Francesco Nigro <[email protected]>
AuthorDate: Sat Aug 13 05:09:15 2022 +0200
KAFKA-13900 Support Java 9 direct ByteBuffer Checksum methods (#12163)
Some numbers with JDK 11.
Before:
```
Benchmark (bytes) (direct) (readonly) (seed) Mode Cnt
Score Error Units
Crc32CBenchmark.checksum 128 false false 42 thrpt 20
26.730 ± 0.410 ops/us
Crc32CBenchmark.checksum 128 true false 42 thrpt 20
1.781 ± 0.007 ops/us
Crc32CBenchmark.checksum 1024 false false 42 thrpt 20
6.553 ± 0.053 ops/us
Crc32CBenchmark.checksum 1024 true false 42 thrpt 20
0.223 ± 0.001 ops/us
Crc32CBenchmark.checksum 4096 false false 42 thrpt 20
4.054 ± 0.015 ops/us
Crc32CBenchmark.checksum 4096 true false 42 thrpt 20
0.056 ± 0.001 ops/us
```
And this PR:
```
Benchmark (bytes) (direct) (readonly) (seed) Mode Cnt
Score Error Units
Crc32CBenchmark.checksum 128 false false 42 thrpt 20
26.922 ± 0.065 ops/us
Crc32CBenchmark.checksum 128 true false 42 thrpt 20
24.656 ± 0.620 ops/us
Crc32CBenchmark.checksum 1024 false false 42 thrpt 20
6.548 ± 0.025 ops/us
Crc32CBenchmark.checksum 1024 true false 42 thrpt 20
6.432 ± 0.136 ops/us
Crc32CBenchmark.checksum 4096 false false 42 thrpt 20
4.031 ± 0.022 ops/us
Crc32CBenchmark.checksum 4096 true false 42 thrpt 20
4.004 ± 0.016 ops/us
```
The purpose of the PR is to makes heap and direct ByteBuffer able to
perform the same (especially
not read-only), without affecting the existing heap ByteBuffer performance.
Reviewers: Ismael Juma <[email protected]>, Divij Vaidya <[email protected]>
---
.../org/apache/kafka/common/utils/Checksums.java | 55 +++++++++++++++-
.../org/apache/kafka/jmh/util/Crc32CBenchmark.java | 73 ++++++++++++++++++++++
2 files changed, 127 insertions(+), 1 deletion(-)
diff --git a/clients/src/main/java/org/apache/kafka/common/utils/Checksums.java
b/clients/src/main/java/org/apache/kafka/common/utils/Checksums.java
index 679b59249dc..17a31edc9a9 100644
--- a/clients/src/main/java/org/apache/kafka/common/utils/Checksums.java
+++ b/clients/src/main/java/org/apache/kafka/common/utils/Checksums.java
@@ -16,6 +16,9 @@
*/
package org.apache.kafka.common.utils;
+import java.lang.invoke.MethodHandle;
+import java.lang.invoke.MethodHandles;
+import java.lang.invoke.MethodType;
import java.nio.ByteBuffer;
import java.util.zip.Checksum;
@@ -29,22 +32,72 @@ import java.util.zip.Checksum;
* NOTE: This class is intended for INTERNAL usage only within Kafka.
*/
public final class Checksums {
+ private static final MethodHandle BYTE_BUFFER_UPDATE;
+
+ static {
+ MethodHandle byteBufferUpdate = null;
+ if (Java.IS_JAVA9_COMPATIBLE) {
+ try {
+ byteBufferUpdate =
MethodHandles.publicLookup().findVirtual(Checksum.class, "update",
+ MethodType.methodType(void.class, ByteBuffer.class));
+ } catch (Throwable t) {
+ handleUpdateThrowable(t);
+ }
+ }
+ BYTE_BUFFER_UPDATE = byteBufferUpdate;
+ }
private Checksums() {
}
+ /**
+ * Uses {@link Checksum#update} on {@code buffer}'s content, without
modifying its position and limit.<br>
+ * This is semantically equivalent to {@link #update(Checksum, ByteBuffer,
int, int)} with {@code offset = 0}.
+ */
public static void update(Checksum checksum, ByteBuffer buffer, int
length) {
update(checksum, buffer, 0, length);
}
+ /**
+ * Uses {@link Checksum#update} on {@code buffer}'s content, starting from
the given {@code offset}
+ * by the provided {@code length}, without modifying its position and
limit.
+ */
public static void update(Checksum checksum, ByteBuffer buffer, int
offset, int length) {
if (buffer.hasArray()) {
checksum.update(buffer.array(), buffer.position() +
buffer.arrayOffset() + offset, length);
+ } else if (BYTE_BUFFER_UPDATE != null && buffer.isDirect()) {
+ final int oldPosition = buffer.position();
+ final int oldLimit = buffer.limit();
+ try {
+ // save a slice to be used to save an allocation in the
hot-path
+ final int start = oldPosition + offset;
+ buffer.limit(start + length);
+ buffer.position(start);
+ BYTE_BUFFER_UPDATE.invokeExact(checksum, buffer);
+ } catch (Throwable t) {
+ handleUpdateThrowable(t);
+ } finally {
+ // reset buffer's offsets
+ buffer.limit(oldLimit);
+ buffer.position(oldPosition);
+ }
} else {
+ // slow-path
int start = buffer.position() + offset;
- for (int i = start; i < start + length; i++)
+ for (int i = start; i < start + length; i++) {
checksum.update(buffer.get(i));
+ }
+ }
+ }
+
+ private static void handleUpdateThrowable(Throwable t) {
+ if (t instanceof RuntimeException) {
+ throw (RuntimeException) t;
+ }
+ if (t instanceof Error) {
+ throw (Error) t;
}
+ throw new IllegalStateException(t);
}
public static void updateInt(Checksum checksum, int input) {
diff --git
a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/util/Crc32CBenchmark.java
b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/util/Crc32CBenchmark.java
new file mode 100644
index 00000000000..6f60a0f43e0
--- /dev/null
+++
b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/util/Crc32CBenchmark.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.jmh.util;
+
+import org.apache.kafka.common.utils.Crc32C;
+
+import java.nio.ByteBuffer;
+import java.util.SplittableRandom;
+import java.util.concurrent.TimeUnit;
+import org.openjdk.jmh.annotations.Benchmark;
+import org.openjdk.jmh.annotations.Fork;
+import org.openjdk.jmh.annotations.Measurement;
+import org.openjdk.jmh.annotations.OutputTimeUnit;
+import org.openjdk.jmh.annotations.Param;
+import org.openjdk.jmh.annotations.Scope;
+import org.openjdk.jmh.annotations.Setup;
+import org.openjdk.jmh.annotations.State;
+import org.openjdk.jmh.annotations.Warmup;
+
+@State(Scope.Benchmark)
+@OutputTimeUnit(TimeUnit.MICROSECONDS)
+@Fork(2)
+@Warmup(iterations = 5, time = 1)
+@Measurement(iterations = 10, time = 1)
+public class Crc32CBenchmark {
+
+ @Param({"false", "true"})
+ private boolean direct;
+
+
+ @Param({"false", "true"})
+ private boolean readonly;
+
+ @Param({"42"})
+ private int seed;
+
+ @Param({"128", "1024", "4096"})
+ private int bytes;
+
+ private ByteBuffer input;
+
+ @Setup
+ public void setup() {
+ SplittableRandom random = new SplittableRandom(seed);
+ input = direct ? ByteBuffer.allocateDirect(bytes) :
ByteBuffer.allocate(bytes);
+ for (int o = 0; o < bytes; o++) {
+ input.put(o, (byte) random.nextInt(Byte.MIN_VALUE, Byte.MAX_VALUE
+ 1));
+ }
+ if (readonly) {
+ input = input.asReadOnlyBuffer();
+ }
+ }
+
+ @Benchmark
+ public long checksum() {
+ return Crc32C.compute(input, 0, bytes);
+ }
+
+}