This is an automated email from the ASF dual-hosted git repository.

ijuma pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new aecd47b3bc6 KAFKA-13900 Support Java 9 direct ByteBuffer Checksum 
methods (#12163)
aecd47b3bc6 is described below

commit aecd47b3bc691c3158463dab2c698a3ba9eb2f98
Author: Francesco Nigro <[email protected]>
AuthorDate: Sat Aug 13 05:09:15 2022 +0200

    KAFKA-13900 Support Java 9 direct ByteBuffer Checksum methods (#12163)
    
    Some numbers with JDK 11.
    
    Before:
    ```
    Benchmark                 (bytes)  (direct)  (readonly)  (seed)   Mode  Cnt 
  Score    Error   Units
    Crc32CBenchmark.checksum      128     false       false      42  thrpt   20 
 26.730 ±  0.410  ops/us
    Crc32CBenchmark.checksum      128      true       false      42  thrpt   20 
  1.781 ±  0.007  ops/us
    Crc32CBenchmark.checksum     1024     false       false      42  thrpt   20 
  6.553 ±  0.053  ops/us
    Crc32CBenchmark.checksum     1024      true       false      42  thrpt   20 
  0.223 ±  0.001  ops/us
    Crc32CBenchmark.checksum     4096     false       false      42  thrpt   20 
  4.054 ±  0.015  ops/us
    Crc32CBenchmark.checksum     4096      true       false      42  thrpt   20 
  0.056 ±  0.001  ops/us
    ```
    
    And this PR:
    ```
    Benchmark                 (bytes)  (direct)  (readonly)  (seed)   Mode  Cnt 
  Score   Error   Units
    Crc32CBenchmark.checksum      128     false       false      42  thrpt   20 
 26.922 ± 0.065  ops/us
    Crc32CBenchmark.checksum      128      true       false      42  thrpt   20 
 24.656 ± 0.620  ops/us
    Crc32CBenchmark.checksum     1024     false       false      42  thrpt   20 
  6.548 ± 0.025  ops/us
    Crc32CBenchmark.checksum     1024      true       false      42  thrpt   20 
  6.432 ± 0.136  ops/us
    Crc32CBenchmark.checksum     4096     false       false      42  thrpt   20 
  4.031 ± 0.022  ops/us
    Crc32CBenchmark.checksum     4096      true       false      42  thrpt   20 
  4.004 ± 0.016  ops/us
    ```
    
    The purpose of the PR is to makes heap and direct ByteBuffer able to 
perform the same (especially
    not read-only), without affecting the existing heap ByteBuffer performance.
    
    Reviewers: Ismael Juma <[email protected]>, Divij Vaidya <[email protected]>
---
 .../org/apache/kafka/common/utils/Checksums.java   | 55 +++++++++++++++-
 .../org/apache/kafka/jmh/util/Crc32CBenchmark.java | 73 ++++++++++++++++++++++
 2 files changed, 127 insertions(+), 1 deletion(-)

diff --git a/clients/src/main/java/org/apache/kafka/common/utils/Checksums.java 
b/clients/src/main/java/org/apache/kafka/common/utils/Checksums.java
index 679b59249dc..17a31edc9a9 100644
--- a/clients/src/main/java/org/apache/kafka/common/utils/Checksums.java
+++ b/clients/src/main/java/org/apache/kafka/common/utils/Checksums.java
@@ -16,6 +16,9 @@
  */
 package org.apache.kafka.common.utils;
 
+import java.lang.invoke.MethodHandle;
+import java.lang.invoke.MethodHandles;
+import java.lang.invoke.MethodType;
 import java.nio.ByteBuffer;
 import java.util.zip.Checksum;
 
@@ -29,22 +32,72 @@ import java.util.zip.Checksum;
  * NOTE: This class is intended for INTERNAL usage only within Kafka.
  */
 public final class Checksums {
+    private static final MethodHandle BYTE_BUFFER_UPDATE;
+
+    static {
+        MethodHandle byteBufferUpdate = null;
+        if (Java.IS_JAVA9_COMPATIBLE) {
+            try {
+                byteBufferUpdate = 
MethodHandles.publicLookup().findVirtual(Checksum.class, "update",
+                    MethodType.methodType(void.class, ByteBuffer.class));
+            } catch (Throwable t) {
+                handleUpdateThrowable(t);
+            }
+        }
+        BYTE_BUFFER_UPDATE = byteBufferUpdate;
+    }
 
     private Checksums() {
     }
 
+    /**
+     * Uses {@link Checksum#update} on {@code buffer}'s content, without 
modifying its position and limit.<br>
+     * This is semantically equivalent to {@link #update(Checksum, ByteBuffer, 
int, int)} with {@code offset = 0}.
+     */
     public static void update(Checksum checksum, ByteBuffer buffer, int 
length) {
         update(checksum, buffer, 0, length);
     }
 
+    /**
+     * Uses {@link Checksum#update} on {@code buffer}'s content, starting from 
the given {@code offset}
+     * by the provided {@code length}, without modifying its position and 
limit.
+     */
     public static void update(Checksum checksum, ByteBuffer buffer, int 
offset, int length) {
         if (buffer.hasArray()) {
             checksum.update(buffer.array(), buffer.position() + 
buffer.arrayOffset() + offset, length);
+        } else if (BYTE_BUFFER_UPDATE != null && buffer.isDirect()) {
+            final int oldPosition = buffer.position();
+            final int oldLimit = buffer.limit();
+            try {
+                // save a slice to be used to save an allocation in the 
hot-path
+                final int start = oldPosition + offset;
+                buffer.limit(start + length);
+                buffer.position(start);
+                BYTE_BUFFER_UPDATE.invokeExact(checksum, buffer);
+            } catch (Throwable t) {
+                handleUpdateThrowable(t);
+            } finally {
+                // reset buffer's offsets
+                buffer.limit(oldLimit);
+                buffer.position(oldPosition);
+            }
         } else {
+            // slow-path
             int start = buffer.position() + offset;
-            for (int i = start; i < start + length; i++)
+            for (int i = start; i < start + length; i++) {
                 checksum.update(buffer.get(i));
+            }
+        }
+    }
+
+    private static void handleUpdateThrowable(Throwable t) {
+        if (t instanceof RuntimeException) {
+            throw (RuntimeException) t;
+        }
+        if (t instanceof Error) {
+            throw (Error) t;
         }
+        throw new IllegalStateException(t);
     }
     
     public static void updateInt(Checksum checksum, int input) {
diff --git 
a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/util/Crc32CBenchmark.java 
b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/util/Crc32CBenchmark.java
new file mode 100644
index 00000000000..6f60a0f43e0
--- /dev/null
+++ 
b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/util/Crc32CBenchmark.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.jmh.util;
+
+import org.apache.kafka.common.utils.Crc32C;
+
+import java.nio.ByteBuffer;
+import java.util.SplittableRandom;
+import java.util.concurrent.TimeUnit;
+import org.openjdk.jmh.annotations.Benchmark;
+import org.openjdk.jmh.annotations.Fork;
+import org.openjdk.jmh.annotations.Measurement;
+import org.openjdk.jmh.annotations.OutputTimeUnit;
+import org.openjdk.jmh.annotations.Param;
+import org.openjdk.jmh.annotations.Scope;
+import org.openjdk.jmh.annotations.Setup;
+import org.openjdk.jmh.annotations.State;
+import org.openjdk.jmh.annotations.Warmup;
+
+@State(Scope.Benchmark)
+@OutputTimeUnit(TimeUnit.MICROSECONDS)
+@Fork(2)
+@Warmup(iterations = 5, time = 1)
+@Measurement(iterations = 10, time = 1)
+public class Crc32CBenchmark {
+
+    @Param({"false", "true"})
+    private boolean direct;
+
+
+    @Param({"false", "true"})
+    private boolean readonly;
+
+    @Param({"42"})
+    private int seed;
+
+    @Param({"128", "1024", "4096"})
+    private int bytes;
+
+    private ByteBuffer input;
+
+    @Setup
+    public void setup() {
+        SplittableRandom random = new SplittableRandom(seed);
+        input = direct ? ByteBuffer.allocateDirect(bytes) : 
ByteBuffer.allocate(bytes);
+        for (int o = 0; o < bytes; o++) {
+            input.put(o, (byte) random.nextInt(Byte.MIN_VALUE, Byte.MAX_VALUE 
+ 1));
+        }
+        if (readonly) {
+            input = input.asReadOnlyBuffer();
+        }
+    }
+
+    @Benchmark
+    public long checksum() {
+        return Crc32C.compute(input, 0, bytes);
+    }
+
+}

Reply via email to