funguy-tech opened a new pull request, #17083:
URL: https://github.com/apache/druid/pull/17083
Implements #17062.
### Description
Allows usage of built in compression formats for Kinesis ingestion based on
an optional `compressionFormat` parameter in `ioConfig`.
Unlike Kafka, Kinesis does not provide much by the means of data compression
- it is a common industry pattern to compress Kinesis data across the wire with
client-implemented decompression.
### Changes
#### Prerequisite additions to CompressionUtils.java.
- Added Jackson deserialization support to `Format` to enable simplified
config exposure.
- Added general `compress`/`decompress` utilities with `ByteBuffer`,
`InputStream`, and `OutputStream` parameters
- User specifies desired compression/decompression format as method
parameter
#### Added `compressionFormat` Enum to IOConfig
- By linking to a specified enum, the field's values are limited at load
time - invalid values are automatically rejected by existing Druid spec
safeguards.
- Field is safely fed through:
- `KinesisSupervisor`
- `KinesisSupervisorIOConfig`
- `KinesisSamplerSpec`
- `KinesisTask`
- `KinesisTaskIOConfig`
- `KinesisRecordSupplier`
#### Added handling logic to Kinesis Record Supplier
- If compression is enabled, decompress the user record and replace the
existing userRecord in-place. This ensures that:
1. We do not store both the compressed and decompressed versions of the
record
2. Buffer sizes for calculating the aggregate size of outstanding Kinesis
ingestion data act on the decompressed / 'true' size of data
```
if (compressionFormat != null) {
userRecord.setData(CompressionUtils.decompress(userRecord.getData(),
compressionFormat));
}
```
Discussion remains open on competing designs - this is more or less a
starting point of available functionality to provoke discussion (or implement
if satisfactory as-is).
#### Release note
Added support for compressed Kinesis streams. Users may specify a
`compressionFormat` in IOConfig. Accepted values are `bz2`, `gz`, `snappy`,
`xz`, `zip`, and `zstd`.
##### Key changed/added classes in this PR
* `CompressionUtils`
* `KinesisRecordSupplier`
This PR has:
- [x] been self-reviewed.
- [x] added documentation for new or modified features or behaviors.
- [x] a release note entry in the PR description.
- [x] added Javadocs for most classes and all non-trivial methods. Linked
related entities via Javadoc links.
- [x] added comments explaining the "why" and the intent of the code
wherever would not be obvious for an unfamiliar reader.
- [x] added unit tests or modified existing tests to cover new code paths,
ensuring the threshold for [code
coverage](https://github.com/apache/druid/blob/master/dev/code-review/code-coverage.md)
is met.
- [x] been tested in a test Druid cluster. (Active production use on streams
>1GB/s aggregate throughput)
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]