[ 
https://issues.apache.org/jira/browse/KAFKA-6430?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16315495#comment-16315495
 ] 

Ted Yu commented on KAFKA-6430:
-------------------------------

bq. Here is the awkward writeInt() method in DataInputStream

I guess you meant DataOutputStream instead of DataInputStream

Please use \{code\} surrounding code snippet for the code to be more readable.

> Improve Kafka GZip compression performance
> ------------------------------------------
>
>                 Key: KAFKA-6430
>                 URL: https://issues.apache.org/jira/browse/KAFKA-6430
>             Project: Kafka
>          Issue Type: Improvement
>          Components: clients, core
>            Reporter: Ying Zheng
>            Priority: Minor
>
> To compress messages, Kafka uses DataOutputStream on top of GZIPOutputStream:
>       new DataOutputStream(new GZIPOutputStream(buffer, bufferSize));
> To decompress messages, Kafka uses DataInputStream on top of GZIPInputStream:
>        new DataInputStream(new GZIPInputStream(buffer));
> This is very straight forward, but actually inefficient. For each message, in 
> addition to the key and value data, Kafka has to write about 30 some metadata 
> bytes (slightly varies in different Kafka version), including magic byte, 
> checksum, timestamp, offset, key length, value length etc. For each of these 
> bytes, java DataOutputStream has to call write(byte) once. Here is the 
> awkward writeInt() method in DataInputStream, which writes 4 bytes separately 
> in big-endian order. 
>     public final void writeInt(int v) throws IOException {
>         out.write((v >>> 24) & 0xFF);
>         out.write((v >>> 16) & 0xFF);
>         out.write((v >>>  8) & 0xFF);
>         out.write((v >>>  0) & 0xFF);
>         incCount(4);
>     }
> Unfortunately, GZIPOutputStream does not implement the write(byte) method. 
> Instead, it only provides a write(byte[], offset, len) method, which calls 
> the corresponding JNI zlib function. The write(byte) calls from 
> DataOutputStream are translated into write(byte[], offset, len) calls in a 
> very inefficient way: (Oracle JDK 1.8 code)
> class DeflaterOutputStream {
>     public void write(int b) throws IOException {
>         byte[] buf = new byte[1];
>         buf[0] = (byte)(b & 0xff);
>         write(buf, 0, 1);
>     }
>     public void write(byte[] b, int off, int len) throws IOException {
>         if (def.finished()) {
>             throw new IOException("write beyond end of stream");
>         }
>         if ((off | len | (off + len) | (b.length - (off + len))) < 0) {
>             throw new IndexOutOfBoundsException();
>         } else if (len == 0) {
>             return;
>         }
>         if (!def.finished()) {
>             def.setInput(b, off, len);
>             while (!def.needsInput()) {
>                 deflate();
>             }
>         }
>     }
> }
> class GZIPOutputStream extends DeflaterOutputStream {
>     public synchronized void write(byte[] buf, int off, int len)
>         throws IOException
>     {
>         super.write(buf, off, len);
>         crc.update(buf, off, len);
>     }
> }
> class Deflater {
> private native int deflateBytes(long addr, byte[] b, int off, int len, int 
> flush);
> }
> class CRC32 {
>     public void update(byte[] b, int off, int len) {
>         if (b == null) {
>             throw new NullPointerException();
>         }
>         if (off < 0 || len < 0 || off > b.length - len) {
>             throw new ArrayIndexOutOfBoundsException();
>         }
>         crc = updateBytes(crc, b, off, len);
>     }
>     private native static int updateBytes(int crc, byte[] b, int off, int 
> len);
> }
> For each meta data byte, the code above has to allocate 1 single byte array, 
> acquire several locks, call two native JNI methods (Deflater.deflateBytes and 
> CRC32.updateBytes). In each Kafka message, there are about 30 some meta data 
> bytes.
> The call stack of Deflater.deflateBytes():
> DeflaterOutputStream.public void write(int b) -> 
> GZIPOutputStream.write(byte[] buf, int off, int len) -> 
> DeflaterOutputStream.write(byte[] b, int off, int len) -> 
> DeflaterOutputStream.deflate() -> Deflater.deflate(byte[] b, int off, int 
> len) -> Deflater.deflate(byte[] b, int off, int len, int flush) -> 
> Deflater.deflateBytes(long addr, byte[] b, int off, int len, int flush)
> The call stack of CRC32.updateBytes():
> DeflaterOutputStream.public void write(int b) -> 
> GZIPOutputStream.write(byte[] buf, int off, int len) -> CRC32.update(byte[] 
> b, int off, int len) -> CRC32.updateBytes(int crc, byte[] b, int off, int len)
> At Uber, we found that adding a small buffer between DataOutputStream and 
> GZIPOutputStream can speed up Kafka GZip compression speed by about 60% in 
> average.
>  -                    return new DataOutputStream(new 
> GZIPOutputStream(buffer, bufferSize));
> +                    return new DataOutputStream(new BufferedOutputStream(new 
> GZIPOutputStream(buffer, bufferSize), 1 << 14));
> The similar fix also applies to GZip decompression.
> We have tested this improvement on Kafka 10.2 / Oracle JDK 8, with the 
> production traffic at Uber:
> || Topic || Avg Message Size (bytes) || Vanilla Kafka Throughput (MB/s) || 
> Kafka /w GZip Buffer Throughput (MB/s) || Speed Up||
> | topic 1 | 197 | 10.9 | 21.9 | 2.0 |
> | topic 2 | 208 | 8.5 | 15.9 | 1.9 |
> | topic 3 | 624 | 15.3 | 20.2 | 1.3 |
> | topic 4 | 766 | 28.0 | 43.7 | 1.6 |
> | topic 5 | 1168 | 22.9 | 25.4 | 1.1 |
> | topic 6 | 165021 | 9.1 | 9.2 |  1.0 |



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

Reply via email to