Aaron Zinger created AVRO-3509:
----------------------------------

             Summary: Allow stateful compression codecs in avro schemas
                 Key: AVRO-3509
                 URL: https://issues.apache.org/jira/browse/AVRO-3509
             Project: Apache Avro
          Issue Type: Improvement
            Reporter: Aaron Zinger


Avro schemas specify a compression codec using a named string like "deflate". 
However, some workflows aren't able to use compression effectively because 
they're applying deflate separately to each individual record, when most 
compressibility comes from similarities between records.

For example, say I have a table containing the text of the book Moby Dick. Each 
row contains a chapter number, a view counter, and the text of a chapter. 
Whenever a row is updated, either to fix a typo in the text or to increment the 
view counter, I compress the row, encode it using an avro schema and codec 
stored separately (e.g. using a confluent schema registry), and send the 
compressed message to an external webhook, which fetches and caches the 
indicated metadata and uses it to decompress and decode the message. At first, 
the inefficiency is not too bad. The entire text of Moby Dick compresses to 41% 
of its original size, but compressing each chapter individually leads to 148 
different records of total size 46% of the original. But now when I make a 
small text change to chapter 5, we re-emit the entire compressed row, for a 
4320-byte message. If instead I were compressing the full text of Moby Dick 
plus a slightly-modified copy of chapter 5, the compressed file would only be 8 
bytes longer! An ideal stateful streaming compression algorithm only needs to 
communicate which chapter is being modified (about 1 byte of entropy), and the 
diff. So over time, stateless compression stays at a steady 59% efficiency, 
while stateful compression would tend towards close to 100% efficiency.

We could enable stateful compression by allowing codecs of the form, say, 
"deflate+customdictionary={bytes}", where bytes is a dynamically generated 
Huffman coding of common strings, similar to the start of a compressed block in 
a normal deflate format. To decompress, you'd just fetch and cache the codec 
along with the schema and prepend it to each compressed message. Updating the 
codec would mean bumping the metadata version number, just like updating the 
schema, and would use the same flow. 

On the compression side, there are a couple of different implementation 
options. In my project that's motivating this, the same distributed workers 
that process rows and send messages are responsible for generating and 
registering schemas. So a worker would start out compressing using a vanilla 
codec, but build up a Huffman tree in memory as it sends each message. When 
it's both sent some threshold number of messages and compression using the 
custom tree is some threshold more efficient than using the 
most-recently-registered codec, it registers a new codec using the custom tree, 
then repeats this process, compressing in parallel using both the static 
registered codec and a stateful one it continues to update (maybe only for a 
random sample of records to save resources) until both thresholds are crossed 
again.

In other projects, the codec might be generated separately out-of-band by a 
centralized process that has access to all records. Compression workers would 
subscribe to or poll the schema registry and use the most up-to-date codec they 
have.

The result would be compression that is transparent to the consumer as long as 
they have access to a library that can decompress using these custom codecs, 
but that tends toward maximum efficiency over time, such that a 9122-byte 
message typically gets compressed to around 10 bytes.



--
This message was sent by Atlassian Jira
(v8.20.7#820007)

Reply via email to