Divij Vaidya created KAFKA-14629:
------------------------------------
Summary: Performance improvement for Zstd compressed workload
Key: KAFKA-14629
URL: https://issues.apache.org/jira/browse/KAFKA-14629
Project: Kafka
Issue Type: Improvement
Components: core
Reporter: Divij Vaidya
Assignee: Divij Vaidya
Attachments: benchmark-jira.xlsx
h2. Motivation
>From a CPU flamegraph analysis for a compressed workload (openmessaging), we
>have observed that ValidateMessagesAndAssignOffsets method takes 75% of the
>total CPU/time taken by UnifiedLog.appendAsLeader(). The improvements
>suggested below will reduce CPU usage and increase throughput.
h2. Background
A producer will append multiple records in a batch and the batch will be
compressed. This compressed batch along with some headers will be sent to the
server. On the server, it will perform a checksum for the batch to validate
data integrity during network transfer. The batch payload is still in
compressed form so far. Broker will now try to append this batch to the log.
Before appending, broker will perform schema integrity validation on individual
records such as record offsets are monotonically increasing etc. To perform
these validations, server will have to decompress the batch.
The schema validation of a batch on the server is done by decompressing and
validating individual records. For each records, the validation needs to read
all fields from the record except for key and value. [1]
h2. Performance requirements
Pre-allocation of array should not add excessive overhead to batches with small
records → For example allocating a 65KB array for a record of size 1KB is an
overkill and negatively impacts performance for small size requests.
Overhead of skipping bytes should be minimal → we don’t need to read key/value
of a record which on average is the largest amount of data in a record. The
implementation should efficiently skip key/value bytes
Minimize the number of JNI calls → JNI calls are expensive and work best when
you make fewer calls to decompress/compress the same amount of data.
Minimize new byte array/buffer allocation → Ideally, the only array allocation
that should happen would be the array used to store the result of
decompression. Even this could be optimized by using buffers backed direct
memory or re-using same buffers since we process one record at a time.
h2. Current implementation - decompression + zstd
We allocated a 2KB array called skipArray to store decompressed data [2]. This
array is re-used for the scope of a batch (i.e. across all records).
We allocate a 16KB array to buffer the data between skipArray and underlying
zstd-jni library calls [3]. The motivation of doing is to read at least 16KB of
data at-a-time in one single call to the JNI layer. This array is re-used for
the scope of a batch (i.e. across all records).
We provide a BufferPool to zstd-jni. It uses this pool to create buffers for
it’s own use, i.e. one allocation per batch and one allocation per skip call().
Note that this pool is not used to store the output of decompression.
Currently, we use BufferPool which is scoped to a thread.
h2. Potential improvements
# Do not read the end of the batch since it contains the key/value for last
record. Instead of “skipping” which would lead to decompression, we can simply
not read it at all.
# Remove two layers of buffers (the 16KB one and 2KB one) and replace with a
single buffer called decompressionBuffer. The time it takes to prepare a batch
for decompression will be bounded by the allocation of largest buffer and
hence, using only one large buffer (16KB) doesn’t cause any regression.
# Use BufferSupplier to allocate the intermediate decompressed buffer.
# Calculate the size of decompressed buffer dynamically at runtime. It could
be based on recommendation provided by Zstd. Currently fixed at 16KB. Using the
value that is recommended by Zstd saves a copy in native code.
[https://github.com/facebook/zstd/issues/340]
# Provide a pool of direct buffers to zstd-jni for it’s internal usage. Direct
buffers is an ideal use case for scenarios where data is transferred across JNI
such as the case in (de) compression. The latest version of zstd-jni works with
direct buffers.
# Read the network input into a direct buffer and pass that to zstd-jni for
decompression. Store the output in a direct buffer as well.
# Use dictionary functionality of decompression. Train the dictionary for
first few MBs and then use it.
# Use the skip functionality of zstd-jni and do not bring “skipped” data to
Kafka layer, hence, we don’t need a buffer size to store skipped data in Kafka.
This could be done by using DataInputStream and removing the intermediate
buffer stream (16Kb one).
h2. Prototype implementation
[https://github.com/divijvaidya/kafka/commits/optimize-compression]
h2. JMH benchmark of prototype
After implementation of suggestion#2 and suggestion#3, we observe 10-25%
improvement in throughput over existing implementation for large message size
and 0-2% improvement in throughout for small message sizes. Note that we expect
this performance to be further improved in production because the thread scope
cached memory pool will be re-used to a greater extent over there. For detailed
results see attached benchmark.
h2.
Reference
[1] LogValidator.java
[2] DefaultRecordBatch → skipArray
[3] ZStdFactory
--
This message was sent by Atlassian Jira
(v8.20.10#820010)