This is an automated email from the ASF dual-hosted git repository.

zhaijia pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/bookkeeper.git


The following commit(s) were added to refs/heads/master by this push:
     new d8c9588  Use reflection based CRC32 to pass direct memory pointer
d8c9588 is described below

commit d8c95885136b46013ae942b3d32ae0ea66866dd2
Author: Matteo Merli <[email protected]>
AuthorDate: Thu Mar 29 21:17:15 2018 +0800

    Use reflection based CRC32 to pass direct memory pointer
    
    The crc update with the bytebuffer is very expensive, especially in netty > 
4.1.12.
    
    ```java
    crc.get().update(data.nioBuffer());
    ```
    
    Converting a direct`ByteBuf` into a `ByteBuffer` with `data.nioBuffer()` is 
actually resorting to allocating an unpooled `DirectByteBuffer` which is 
actually killing the GC (since direct memory is freed when the 
`DirectByteBuffer` instances are GCed, but the pauses are > 1 second).
    
    This is an adaptation of a change from Yahoo branch at 
https://github.com/yahoo/bookkeeper/commit/6c01ca2921a998f3bf2e85cacb27867773e7ea28
    
    Initially I though this change was not needed anymore in 4.7 codebase, and 
that is true for the CRC32c variant. For the regular CRC32, though, we need to 
avoid the bad behavior of unpooled direct buffers.
    
    Additionally, as in #1306, the thread local variable was made static to 
avoid the leakage of instances per each ledger.
    
    Author: Matteo Merli <[email protected]>
    
    Reviewers: Jia Zhai <None>, Sijie Guo <[email protected]>
    
    This closes #1307 from merlimat/crc32-bytebuffer and squashes the following 
commits:
    
    8b81245a0 [Matteo Merli] Made classed package private
    a7ff1573f [Matteo Merli] Use reflection based CRC32 to pass direct memory 
pointer
---
 .../proto/checksum/CRC32DigestManager.java         | 30 +++++--
 .../proto/checksum/DirectMemoryCRC32Digest.java    | 94 ++++++++++++++++++++++
 ...DigestManager.java => StandardCRC32Digest.java} | 35 ++++----
 3 files changed, 131 insertions(+), 28 deletions(-)

diff --git 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/checksum/CRC32DigestManager.java
 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/checksum/CRC32DigestManager.java
index 1f06640..2650828 100644
--- 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/checksum/CRC32DigestManager.java
+++ 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/checksum/CRC32DigestManager.java
@@ -19,13 +19,30 @@ package org.apache.bookkeeper.proto.checksum;
 */
 
 import io.netty.buffer.ByteBuf;
-import java.util.zip.CRC32;
+import io.netty.util.concurrent.FastThreadLocal;
 
+/**
+ * Digest manager for CRC32 checksum.
+ */
 class CRC32DigestManager extends DigestManager {
-    private final ThreadLocal<CRC32> crc = new ThreadLocal<CRC32>() {
+
+    /**
+     * Interface that abstracts different implementations of the CRC32 digest.
+     */
+    interface CRC32Digest {
+        long getValueAndReset();
+
+        void update(ByteBuf buf);
+    }
+
+    private static final FastThreadLocal<CRC32Digest> crc = new 
FastThreadLocal<CRC32Digest>() {
         @Override
-        protected CRC32 initialValue() {
-            return new CRC32();
+        protected CRC32Digest initialValue() {
+            if (DirectMemoryCRC32Digest.isSupported()) {
+                return new DirectMemoryCRC32Digest();
+            } else {
+                return new StandardCRC32Digest();
+            }
         }
     };
 
@@ -40,12 +57,11 @@ class CRC32DigestManager extends DigestManager {
 
     @Override
     void populateValueAndReset(ByteBuf buf) {
-        buf.writeLong(crc.get().getValue());
-        crc.get().reset();
+        buf.writeLong(crc.get().getValueAndReset());
     }
 
     @Override
     void update(ByteBuf data) {
-        crc.get().update(data.nioBuffer());
+        crc.get().update(data);
     }
 }
diff --git 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/checksum/DirectMemoryCRC32Digest.java
 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/checksum/DirectMemoryCRC32Digest.java
new file mode 100644
index 0000000..a895153
--- /dev/null
+++ 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/checksum/DirectMemoryCRC32Digest.java
@@ -0,0 +1,94 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+package org.apache.bookkeeper.proto.checksum;
+
+import io.netty.buffer.ByteBuf;
+
+import java.lang.reflect.InvocationTargetException;
+import java.lang.reflect.Method;
+import java.util.zip.CRC32;
+
+import org.apache.bookkeeper.proto.checksum.CRC32DigestManager.CRC32Digest;
+
+/**
+ * Specialized implementation of CRC32 digest that uses reflection on {@link 
CRC32} class to get access to
+ * "updateByteBuffer" method and pass a direct memory pointer.
+ */
+class DirectMemoryCRC32Digest implements CRC32Digest {
+
+    public static boolean isSupported() {
+        return updateBytes != null;
+    }
+
+    private int crcValue;
+
+    @Override
+    public long getValueAndReset() {
+        long value = crcValue & 0xffffffffL;
+        crcValue = 0;
+        return value;
+    }
+
+    @Override
+    public void update(ByteBuf buf) {
+        int index = buf.readerIndex();
+        int length = buf.readableBytes();
+
+        try {
+            if (buf.hasMemoryAddress()) {
+                // Calculate CRC directly from the direct memory pointer
+                crcValue = (int) updateByteBuffer.invoke(null, crcValue, 
buf.memoryAddress(), index, length);
+            } else if (buf.hasArray()) {
+                // Use the internal method to update from array based
+                crcValue = (int) updateBytes.invoke(null, crcValue, 
buf.array(), buf.arrayOffset() + index, length);
+            } else {
+                // Fallback to data copy if buffer is not contiguous
+                byte[] b = new byte[length];
+                buf.getBytes(index, b, 0, length);
+                crcValue = (int) updateBytes.invoke(null, crcValue, b, 0, 
b.length);
+            }
+        } catch (IllegalAccessException | InvocationTargetException e) {
+            throw new RuntimeException(e);
+        }
+    }
+
+    private static final Method updateByteBuffer;
+    private static final Method updateBytes;
+
+    static {
+        // Access CRC32 class private native methods to compute the crc on the 
ByteBuf direct memory,
+        // without necessity to convert to a nio ByteBuffer.
+        Method updateByteBufferMethod = null;
+        Method updateBytesMethod = null;
+        try {
+            updateByteBufferMethod = 
CRC32.class.getDeclaredMethod("updateByteBuffer", int.class, long.class, 
int.class,
+                    int.class);
+            updateByteBufferMethod.setAccessible(true);
+
+            updateBytesMethod = CRC32.class.getDeclaredMethod("updateBytes", 
int.class, byte[].class, int.class,
+                    int.class);
+            updateBytesMethod.setAccessible(true);
+        } catch (NoSuchMethodException | SecurityException e) {
+            updateByteBufferMethod = null;
+            updateBytesMethod = null;
+        }
+
+        updateByteBuffer = updateByteBufferMethod;
+        updateBytes = updateBytesMethod;
+    }
+}
diff --git 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/checksum/CRC32DigestManager.java
 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/checksum/StandardCRC32Digest.java
similarity index 62%
copy from 
bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/checksum/CRC32DigestManager.java
copy to 
bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/checksum/StandardCRC32Digest.java
index 1f06640..f103b14 100644
--- 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/checksum/CRC32DigestManager.java
+++ 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/checksum/StandardCRC32Digest.java
@@ -1,5 +1,3 @@
-package org.apache.bookkeeper.proto.checksum;
-
 /*
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
@@ -17,35 +15,30 @@ package org.apache.bookkeeper.proto.checksum;
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
+package org.apache.bookkeeper.proto.checksum;
 
 import io.netty.buffer.ByteBuf;
+
 import java.util.zip.CRC32;
 
-class CRC32DigestManager extends DigestManager {
-    private final ThreadLocal<CRC32> crc = new ThreadLocal<CRC32>() {
-        @Override
-        protected CRC32 initialValue() {
-            return new CRC32();
-        }
-    };
+import org.apache.bookkeeper.proto.checksum.CRC32DigestManager.CRC32Digest;
 
-    public CRC32DigestManager(long ledgerId) {
-        super(ledgerId);
-    }
+/**
+ * Regular implementation of CRC32 digest that makes use of {@link CRC32} 
class.
+ */
+class StandardCRC32Digest implements CRC32Digest {
 
-    @Override
-    int getMacCodeLength() {
-        return 8;
-    }
+    private final CRC32 crc = new CRC32();
 
     @Override
-    void populateValueAndReset(ByteBuf buf) {
-        buf.writeLong(crc.get().getValue());
-        crc.get().reset();
+    public long getValueAndReset() {
+        long value = crc.getValue();
+        crc.reset();
+        return value;
     }
 
     @Override
-    void update(ByteBuf data) {
-        crc.get().update(data.nioBuffer());
+    public void update(ByteBuf buf) {
+        crc.update(buf.nioBuffer());
     }
 }

-- 
To stop receiving notification emails like this one, please contact
[email protected].

Reply via email to