Repository: kafka
Updated Branches:
refs/heads/0.9.0 4f3bdca5b -> 7d37086e5
KAFKA-2882: Add constructor cache for Snappy and LZ4 Output/Input streams in
Compressor.java
In `wrapForOutput` and `wrapForInput` methods of
`org.apache.kafka.common.record.Compressor`, `Class.forName("[compression
codec]")` and `getConstructor` methods are invoked for each `wrapForOutput` /
`wrapForInput` call. Reflection calls are expensive and impact performance at
high volumes. This patch adds a cache for `Constructor` to reduce the
reflection overhead.
In our production deployments, this has reduced producer CPU usage by about 20%
Author: Maksim Logvinenko <[email protected]>
Reviewers: Ismael Juma
Closes #580 from logarithm/compressor-getclass-cache
(cherry picked from commit 4a0e011be3d038763d6326bb0092524f809c3f4d)
Signed-off-by: Guozhang Wang <[email protected]>
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/7d37086e
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/7d37086e
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/7d37086e
Branch: refs/heads/0.9.0
Commit: 7d37086e5e3225b003fc353b86a130a56c1a469b
Parents: 4f3bdca
Author: Maksim Logvinenko <[email protected]>
Authored: Thu Nov 26 22:18:21 2015 -0800
Committer: Guozhang Wang <[email protected]>
Committed: Thu Nov 26 22:18:35 2015 -0800
----------------------------------------------------------------------
.../apache/kafka/common/record/Compressor.java | 87 ++++++++++++++++----
1 file changed, 69 insertions(+), 18 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/7d37086e/clients/src/main/java/org/apache/kafka/common/record/Compressor.java
----------------------------------------------------------------------
diff --git
a/clients/src/main/java/org/apache/kafka/common/record/Compressor.java
b/clients/src/main/java/org/apache/kafka/common/record/Compressor.java
index 27f757a..1aee389 100644
--- a/clients/src/main/java/org/apache/kafka/common/record/Compressor.java
+++ b/clients/src/main/java/org/apache/kafka/common/record/Compressor.java
@@ -16,6 +16,7 @@
*/
package org.apache.kafka.common.record;
+import java.lang.reflect.Constructor;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.utils.Utils;
@@ -46,6 +47,40 @@ public class Compressor {
}
}
+ // dynamically load the snappy and lz4 classes to avoid runtime dependency
if we are not using compression
+ // caching constructors to avoid invoking of Class.forName method for each
batch
+ private static MemoizingConstructorSupplier snappyOutputStreamSupplier =
new MemoizingConstructorSupplier(new ConstructorSupplier() {
+ @Override
+ public Constructor get() throws ClassNotFoundException,
NoSuchMethodException {
+ return Class.forName("org.xerial.snappy.SnappyOutputStream")
+ .getConstructor(OutputStream.class, Integer.TYPE);
+ }
+ });
+
+ private static MemoizingConstructorSupplier lz4OutputStreamSupplier = new
MemoizingConstructorSupplier(new ConstructorSupplier() {
+ @Override
+ public Constructor get() throws ClassNotFoundException,
NoSuchMethodException {
+ return
Class.forName("org.apache.kafka.common.record.KafkaLZ4BlockOutputStream")
+ .getConstructor(OutputStream.class);
+ }
+ });
+
+ private static MemoizingConstructorSupplier snappyInputStreamSupplier =
new MemoizingConstructorSupplier(new ConstructorSupplier() {
+ @Override
+ public Constructor get() throws ClassNotFoundException,
NoSuchMethodException {
+ return Class.forName("org.xerial.snappy.SnappyInputStream")
+ .getConstructor(InputStream.class);
+ }
+ });
+
+ private static MemoizingConstructorSupplier lz4InputStreamSupplier = new
MemoizingConstructorSupplier(new ConstructorSupplier() {
+ @Override
+ public Constructor get() throws ClassNotFoundException,
NoSuchMethodException {
+ return
Class.forName("org.apache.kafka.common.record.KafkaLZ4BlockInputStream")
+ .getConstructor(InputStream.class);
+ }
+ });
+
private final CompressionType type;
private final DataOutputStream appendStream;
private final ByteBufferOutputStream bufferStream;
@@ -79,7 +114,7 @@ public class Compressor {
public ByteBuffer buffer() {
return bufferStream.buffer();
}
-
+
public double compressionRate() {
ByteBuffer buffer = bufferStream.buffer();
if (this.writtenUncompressed == 0)
@@ -209,21 +244,15 @@ public class Compressor {
case GZIP:
return new DataOutputStream(new GZIPOutputStream(buffer,
bufferSize));
case SNAPPY:
- // dynamically load the snappy class to avoid runtime
dependency
- // on snappy if we are not using it
try {
- Class<?> outputStreamClass =
Class.forName("org.xerial.snappy.SnappyOutputStream");
- OutputStream stream = (OutputStream)
outputStreamClass.getConstructor(OutputStream.class, Integer.TYPE)
- .newInstance(buffer, bufferSize);
+ OutputStream stream = (OutputStream)
snappyOutputStreamSupplier.get().newInstance(buffer, bufferSize);
return new DataOutputStream(stream);
} catch (Exception e) {
throw new KafkaException(e);
}
case LZ4:
try {
- Class<?> outputStreamClass =
Class.forName("org.apache.kafka.common.record.KafkaLZ4BlockOutputStream");
- OutputStream stream = (OutputStream)
outputStreamClass.getConstructor(OutputStream.class)
- .newInstance(buffer);
+ OutputStream stream = (OutputStream)
lz4OutputStreamSupplier.get().newInstance(buffer);
return new DataOutputStream(stream);
} catch (Exception e) {
throw new KafkaException(e);
@@ -244,22 +273,15 @@ public class Compressor {
case GZIP:
return new DataInputStream(new GZIPInputStream(buffer));
case SNAPPY:
- // dynamically load the snappy class to avoid runtime
dependency
- // on snappy if we are not using it
try {
- Class<?> inputStreamClass =
Class.forName("org.xerial.snappy.SnappyInputStream");
- InputStream stream = (InputStream)
inputStreamClass.getConstructor(InputStream.class)
- .newInstance(buffer);
+ InputStream stream = (InputStream)
snappyInputStreamSupplier.get().newInstance(buffer);
return new DataInputStream(stream);
} catch (Exception e) {
throw new KafkaException(e);
}
case LZ4:
- // dynamically load LZ4 class to avoid runtime dependency
try {
- Class<?> inputStreamClass =
Class.forName("org.apache.kafka.common.record.KafkaLZ4BlockInputStream");
- InputStream stream = (InputStream)
inputStreamClass.getConstructor(InputStream.class)
- .newInstance(buffer);
+ InputStream stream = (InputStream)
lz4InputStreamSupplier.get().newInstance(buffer);
return new DataInputStream(stream);
} catch (Exception e) {
throw new KafkaException(e);
@@ -271,4 +293,33 @@ public class Compressor {
throw new KafkaException(e);
}
}
+
+ private interface ConstructorSupplier {
+ Constructor get() throws ClassNotFoundException, NoSuchMethodException;
+ }
+
+ // this code is based on Guava's
@see{com.google.common.base.Suppliers.MemoizingSupplier}
+ private static class MemoizingConstructorSupplier {
+ final ConstructorSupplier delegate;
+ transient volatile boolean initialized;
+ transient Constructor value;
+
+ public MemoizingConstructorSupplier(ConstructorSupplier delegate) {
+ this.delegate = delegate;
+ }
+
+ public Constructor get() throws NoSuchMethodException,
ClassNotFoundException {
+ if (!initialized) {
+ synchronized (this) {
+ if (!initialized) {
+ Constructor constructor = delegate.get();
+ value = constructor;
+ initialized = true;
+ return constructor;
+ }
+ }
+ }
+ return value;
+ }
+ }
}