This is an automated email from the ASF dual-hosted git repository.
zhaijia pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/bookkeeper.git
The following commit(s) were added to refs/heads/master by this push:
new d8c9588 Use reflection based CRC32 to pass direct memory pointer
d8c9588 is described below
commit d8c95885136b46013ae942b3d32ae0ea66866dd2
Author: Matteo Merli <[email protected]>
AuthorDate: Thu Mar 29 21:17:15 2018 +0800
Use reflection based CRC32 to pass direct memory pointer
The crc update with the bytebuffer is very expensive, especially in netty >
4.1.12.
```java
crc.get().update(data.nioBuffer());
```
Converting a direct`ByteBuf` into a `ByteBuffer` with `data.nioBuffer()` is
actually resorting to allocating an unpooled `DirectByteBuffer` which is
actually killing the GC (since direct memory is freed when the
`DirectByteBuffer` instances are GCed, but the pauses are > 1 second).
This is an adaptation of a change from Yahoo branch at
https://github.com/yahoo/bookkeeper/commit/6c01ca2921a998f3bf2e85cacb27867773e7ea28
Initially I though this change was not needed anymore in 4.7 codebase, and
that is true for the CRC32c variant. For the regular CRC32, though, we need to
avoid the bad behavior of unpooled direct buffers.
Additionally, as in #1306, the thread local variable was made static to
avoid the leakage of instances per each ledger.
Author: Matteo Merli <[email protected]>
Reviewers: Jia Zhai <None>, Sijie Guo <[email protected]>
This closes #1307 from merlimat/crc32-bytebuffer and squashes the following
commits:
8b81245a0 [Matteo Merli] Made classed package private
a7ff1573f [Matteo Merli] Use reflection based CRC32 to pass direct memory
pointer
---
.../proto/checksum/CRC32DigestManager.java | 30 +++++--
.../proto/checksum/DirectMemoryCRC32Digest.java | 94 ++++++++++++++++++++++
...DigestManager.java => StandardCRC32Digest.java} | 35 ++++----
3 files changed, 131 insertions(+), 28 deletions(-)
diff --git
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/checksum/CRC32DigestManager.java
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/checksum/CRC32DigestManager.java
index 1f06640..2650828 100644
---
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/checksum/CRC32DigestManager.java
+++
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/checksum/CRC32DigestManager.java
@@ -19,13 +19,30 @@ package org.apache.bookkeeper.proto.checksum;
*/
import io.netty.buffer.ByteBuf;
-import java.util.zip.CRC32;
+import io.netty.util.concurrent.FastThreadLocal;
+/**
+ * Digest manager for CRC32 checksum.
+ */
class CRC32DigestManager extends DigestManager {
- private final ThreadLocal<CRC32> crc = new ThreadLocal<CRC32>() {
+
+ /**
+ * Interface that abstracts different implementations of the CRC32 digest.
+ */
+ interface CRC32Digest {
+ long getValueAndReset();
+
+ void update(ByteBuf buf);
+ }
+
+ private static final FastThreadLocal<CRC32Digest> crc = new
FastThreadLocal<CRC32Digest>() {
@Override
- protected CRC32 initialValue() {
- return new CRC32();
+ protected CRC32Digest initialValue() {
+ if (DirectMemoryCRC32Digest.isSupported()) {
+ return new DirectMemoryCRC32Digest();
+ } else {
+ return new StandardCRC32Digest();
+ }
}
};
@@ -40,12 +57,11 @@ class CRC32DigestManager extends DigestManager {
@Override
void populateValueAndReset(ByteBuf buf) {
- buf.writeLong(crc.get().getValue());
- crc.get().reset();
+ buf.writeLong(crc.get().getValueAndReset());
}
@Override
void update(ByteBuf data) {
- crc.get().update(data.nioBuffer());
+ crc.get().update(data);
}
}
diff --git
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/checksum/DirectMemoryCRC32Digest.java
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/checksum/DirectMemoryCRC32Digest.java
new file mode 100644
index 0000000..a895153
--- /dev/null
+++
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/checksum/DirectMemoryCRC32Digest.java
@@ -0,0 +1,94 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+package org.apache.bookkeeper.proto.checksum;
+
+import io.netty.buffer.ByteBuf;
+
+import java.lang.reflect.InvocationTargetException;
+import java.lang.reflect.Method;
+import java.util.zip.CRC32;
+
+import org.apache.bookkeeper.proto.checksum.CRC32DigestManager.CRC32Digest;
+
+/**
+ * Specialized implementation of CRC32 digest that uses reflection on {@link
CRC32} class to get access to
+ * "updateByteBuffer" method and pass a direct memory pointer.
+ */
+class DirectMemoryCRC32Digest implements CRC32Digest {
+
+ public static boolean isSupported() {
+ return updateBytes != null;
+ }
+
+ private int crcValue;
+
+ @Override
+ public long getValueAndReset() {
+ long value = crcValue & 0xffffffffL;
+ crcValue = 0;
+ return value;
+ }
+
+ @Override
+ public void update(ByteBuf buf) {
+ int index = buf.readerIndex();
+ int length = buf.readableBytes();
+
+ try {
+ if (buf.hasMemoryAddress()) {
+ // Calculate CRC directly from the direct memory pointer
+ crcValue = (int) updateByteBuffer.invoke(null, crcValue,
buf.memoryAddress(), index, length);
+ } else if (buf.hasArray()) {
+ // Use the internal method to update from array based
+ crcValue = (int) updateBytes.invoke(null, crcValue,
buf.array(), buf.arrayOffset() + index, length);
+ } else {
+ // Fallback to data copy if buffer is not contiguous
+ byte[] b = new byte[length];
+ buf.getBytes(index, b, 0, length);
+ crcValue = (int) updateBytes.invoke(null, crcValue, b, 0,
b.length);
+ }
+ } catch (IllegalAccessException | InvocationTargetException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ private static final Method updateByteBuffer;
+ private static final Method updateBytes;
+
+ static {
+ // Access CRC32 class private native methods to compute the crc on the
ByteBuf direct memory,
+ // without necessity to convert to a nio ByteBuffer.
+ Method updateByteBufferMethod = null;
+ Method updateBytesMethod = null;
+ try {
+ updateByteBufferMethod =
CRC32.class.getDeclaredMethod("updateByteBuffer", int.class, long.class,
int.class,
+ int.class);
+ updateByteBufferMethod.setAccessible(true);
+
+ updateBytesMethod = CRC32.class.getDeclaredMethod("updateBytes",
int.class, byte[].class, int.class,
+ int.class);
+ updateBytesMethod.setAccessible(true);
+ } catch (NoSuchMethodException | SecurityException e) {
+ updateByteBufferMethod = null;
+ updateBytesMethod = null;
+ }
+
+ updateByteBuffer = updateByteBufferMethod;
+ updateBytes = updateBytesMethod;
+ }
+}
diff --git
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/checksum/CRC32DigestManager.java
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/checksum/StandardCRC32Digest.java
similarity index 62%
copy from
bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/checksum/CRC32DigestManager.java
copy to
bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/checksum/StandardCRC32Digest.java
index 1f06640..f103b14 100644
---
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/checksum/CRC32DigestManager.java
+++
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/checksum/StandardCRC32Digest.java
@@ -1,5 +1,3 @@
-package org.apache.bookkeeper.proto.checksum;
-
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
@@ -17,35 +15,30 @@ package org.apache.bookkeeper.proto.checksum;
* See the License for the specific language governing permissions and
* limitations under the License.
*/
+package org.apache.bookkeeper.proto.checksum;
import io.netty.buffer.ByteBuf;
+
import java.util.zip.CRC32;
-class CRC32DigestManager extends DigestManager {
- private final ThreadLocal<CRC32> crc = new ThreadLocal<CRC32>() {
- @Override
- protected CRC32 initialValue() {
- return new CRC32();
- }
- };
+import org.apache.bookkeeper.proto.checksum.CRC32DigestManager.CRC32Digest;
- public CRC32DigestManager(long ledgerId) {
- super(ledgerId);
- }
+/**
+ * Regular implementation of CRC32 digest that makes use of {@link CRC32}
class.
+ */
+class StandardCRC32Digest implements CRC32Digest {
- @Override
- int getMacCodeLength() {
- return 8;
- }
+ private final CRC32 crc = new CRC32();
@Override
- void populateValueAndReset(ByteBuf buf) {
- buf.writeLong(crc.get().getValue());
- crc.get().reset();
+ public long getValueAndReset() {
+ long value = crc.getValue();
+ crc.reset();
+ return value;
}
@Override
- void update(ByteBuf data) {
- crc.get().update(data.nioBuffer());
+ public void update(ByteBuf buf) {
+ crc.update(buf.nioBuffer());
}
}
--
To stop receiving notification emails like this one, please contact
[email protected].