funguy-tech opened a new pull request, #17083:
URL: https://github.com/apache/druid/pull/17083

   Implements #17062.
   
   ### Description
   
   Allows usage of built in compression formats for Kinesis ingestion based on 
an optional `compressionFormat` parameter in `ioConfig`.
   
   Unlike Kafka, Kinesis does not provide much by the means of data compression 
- it is a common industry pattern to compress Kinesis data across the wire with 
client-implemented decompression.
   
   
   ### Changes
   
   #### Prerequisite additions to CompressionUtils.java.
   - Added Jackson deserialization support to `Format` to enable simplified 
config exposure.
   
   - Added general `compress`/`decompress` utilities with `ByteBuffer`, 
`InputStream`, and `OutputStream` parameters
     - User specifies desired compression/decompression format as method 
parameter
   
   #### Added `compressionFormat` Enum to IOConfig
   
   - By linking to a specified enum, the field's values are limited at load 
time - invalid values are automatically rejected by existing Druid spec 
safeguards.
   
   - Field is safely fed through:
     - `KinesisSupervisor`
     - `KinesisSupervisorIOConfig`
     - `KinesisSamplerSpec`
     - `KinesisTask`
     - `KinesisTaskIOConfig`
     - `KinesisRecordSupplier`
   
   #### Added handling logic to Kinesis Record Supplier
   
   - If compression is enabled, decompress the user record and replace the 
existing userRecord in-place. This ensures that:
     1. We do not store both the compressed and decompressed versions of the 
record 
     2. Buffer sizes for calculating the aggregate size of outstanding Kinesis 
ingestion data act on the decompressed  / 'true' size of data
   
   ```
    if (compressionFormat != null) {
       userRecord.setData(CompressionUtils.decompress(userRecord.getData(), 
compressionFormat));
     }
   ```
   
   Discussion remains open on competing designs - this is more or less a 
starting point of available functionality to provoke discussion (or implement 
if satisfactory as-is).
   
   #### Release note
   
   Added support for compressed Kinesis streams. Users may specify a 
`compressionFormat` in IOConfig. Accepted values are `bz2`, `gz`, `snappy`, 
`xz`, `zip`, and `zstd`.
   
   
   ##### Key changed/added classes in this PR
    * `CompressionUtils`
    * `KinesisRecordSupplier`
   
   This PR has:
   
   - [x] been self-reviewed.
   - [x] added documentation for new or modified features or behaviors.
   - [x] a release note entry in the PR description.
   - [x] added Javadocs for most classes and all non-trivial methods. Linked 
related entities via Javadoc links.
   - [x] added comments explaining the "why" and the intent of the code 
wherever would not be obvious for an unfamiliar reader.
   - [x] added unit tests or modified existing tests to cover new code paths, 
ensuring the threshold for [code 
coverage](https://github.com/apache/druid/blob/master/dev/code-review/code-coverage.md)
 is met.
   - [x] been tested in a test Druid cluster. (Active production use on streams 
>1GB/s aggregate throughput)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to