Aaron Zinger created AVRO-3509:
----------------------------------
Summary: Allow stateful compression codecs in avro schemas
Key: AVRO-3509
URL: https://issues.apache.org/jira/browse/AVRO-3509
Project: Apache Avro
Issue Type: Improvement
Reporter: Aaron Zinger
Avro schemas specify a compression codec using a named string like "deflate".
However, some workflows aren't able to use compression effectively because
they're applying deflate separately to each individual record, when most
compressibility comes from similarities between records.
For example, say I have a table containing the text of the book Moby Dick. Each
row contains a chapter number, a view counter, and the text of a chapter.
Whenever a row is updated, either to fix a typo in the text or to increment the
view counter, I compress the row, encode it using an avro schema and codec
stored separately (e.g. using a confluent schema registry), and send the
compressed message to an external webhook, which fetches and caches the
indicated metadata and uses it to decompress and decode the message. At first,
the inefficiency is not too bad. The entire text of Moby Dick compresses to 41%
of its original size, but compressing each chapter individually leads to 148
different records of total size 46% of the original. But now when I make a
small text change to chapter 5, we re-emit the entire compressed row, for a
4320-byte message. If instead I were compressing the full text of Moby Dick
plus a slightly-modified copy of chapter 5, the compressed file would only be 8
bytes longer! An ideal stateful streaming compression algorithm only needs to
communicate which chapter is being modified (about 1 byte of entropy), and the
diff. So over time, stateless compression stays at a steady 59% efficiency,
while stateful compression would tend towards close to 100% efficiency.
We could enable stateful compression by allowing codecs of the form, say,
"deflate+customdictionary={bytes}", where bytes is a dynamically generated
Huffman coding of common strings, similar to the start of a compressed block in
a normal deflate format. To decompress, you'd just fetch and cache the codec
along with the schema and prepend it to each compressed message. Updating the
codec would mean bumping the metadata version number, just like updating the
schema, and would use the same flow.
On the compression side, there are a couple of different implementation
options. In my project that's motivating this, the same distributed workers
that process rows and send messages are responsible for generating and
registering schemas. So a worker would start out compressing using a vanilla
codec, but build up a Huffman tree in memory as it sends each message. When
it's both sent some threshold number of messages and compression using the
custom tree is some threshold more efficient than using the
most-recently-registered codec, it registers a new codec using the custom tree,
then repeats this process, compressing in parallel using both the static
registered codec and a stateful one it continues to update (maybe only for a
random sample of records to save resources) until both thresholds are crossed
again.
In other projects, the codec might be generated separately out-of-band by a
centralized process that has access to all records. Compression workers would
subscribe to or poll the schema registry and use the most up-to-date codec they
have.
The result would be compression that is transparent to the consumer as long as
they have access to a library that can decompress using these custom codecs,
but that tends toward maximum efficiency over time, such that a 9122-byte
message typically gets compressed to around 10 bytes.
--
This message was sent by Atlassian Jira
(v8.20.7#820007)