jasonk000 opened a new pull request #12303: URL: https://github.com/apache/druid/pull/12303
### Description Introduce parallel parsing to the stream ingestion service. This is slightly different as it allows each "indexing" task to run parse and transform stages across multiple threads and feed them into a single appenderator. This allows to increase throughput of each task, which has a number of benefits: It allows us to reduce overall task counts for a given number of Kafka partitions, allows for a better response of each task to short periods of peaks and dips in traffic by more effectively sharing resources across partitions. This is *similar, but different*, to https://github.com/apache/druid/issues/7900: - In 7900, the Indexer creates a number of threads and each threaded task operates as independent within the process rather than a forked process, but operates as it did before in terms of per-task throughput characteristics. - In this PR, each task can operate parsing across multiple threads so the tasks themselves can handle more messages in a single task. Configurable: - Introduce a new parameter `parsingThreadCount` which can be set on the kafka tuning config, which configures how many threads will be used to parse and apply transforms to records loaded from streaming datasource. - If set, the parameter creates a new `parserThreadPool` which will be used for parse & transform operations. - If no value (`null`), or `1` is used, the existing in-thread behavior occurs (nb: the time of the parse&transform is moved conceptually earlier in the processing). - If `-1`, or `0` or some other negative value is provided, then automatic selection of number of processors is used based on `Runtime.getRuntime().availableProcessors()`. - If a value `1 or more` is used, then that will be the size of the thread pool. To support this: - Introduce a new `ParseResult` which, combined with `Pair` and `List` can capture the full parse results with any possible errors, including tracking whether the result is thrown away, so that all meters are updated correctly. - Introduce a `CachingInputRow` that can be used to perform the transformation of the contents of a row on the parallel threads. - Ensure `TransformSpec::toTransformer` is performed only once and reused (since the input is immutable and a `Transformer` is expensive to create). - Update `kafka-supervisor-reference.md` with new parameter `parsingThreadCount` - Extend `KafkaIndexTest` to test with varying combinations of `parsingThreadCount` ### Discussion - I opted to focus only on the parse & load stage. In order to avoid changing the metric counters etc, during parse stage this collects all possible parse results. Later, immediately prior to append, the metrics will be updated for the row. This ensures all of the existing statistics behavior and error handling is performed exactly as today. It's not clear to me how important the meter data is, and if it is not deemed important we could simplify the result handling and processing. - In our cluster, the "serial" part of work, split between `getRecords` and `Appenderator.add()` takes about 50pc of the task effort, so Amdah's law limits this to a ~1.6x speedup for 4 threads. Since we allocate multiple threads to a task anyway (for GC, queries, etc), being able to use these 4 extra threads provides "free" extra throughput. Further work here on the appenderator would allow significant increases in the boost here. ### Future work - It would be interesting to combine this with #7900 ; in theory it should be possible to share even the parse thread pool across multiple tasks so that resources are automatically available. - The appenderator is currently marked as single-caller-only so does not support parallel `add()` calls, this enforces a natural Amdah's Law limit to the scalability. I will have future PRs that should reduce the cost of the appenderator add call. In an ideal world, it would be possible to create a concurrent appenderator which would mean the entire task could parse across many threads. ##### Key changed/added classes in this PR * `extensions-core/kafka-indexing-service/*` to handle the new tuning parameter * `extensions-core/kinesis-indexing-service/*` to handle the new tuning parameter * `core/../CachingInputRow` introduced to allow complete pre-instantiation of a row * `StreamChunkParser` returns a `ParseResult` which embeds the parsing output as well as the status (such as thrown_away) * `SeekableStreamIndexTaskRunner` (and related) to accept the new parameter, and a new function `mapRowBlocks` that performs the logic of transforming input record blocks to a parsed block of records ready for appending. * `TransformSpec` is modified to allow it to instantiate a `Transformer` only once. This PR has: - [x] been self-reviewed. - [x] using the [concurrency checklist](https://github.com/apache/druid/blob/master/dev/code-review/concurrency.md) (Remove this item if the PR doesn't have any relation to concurrency.) - [x] added documentation for new or modified features or behaviors. - [x] added Javadocs for most classes and all non-trivial methods. Linked related entities via Javadoc links. - [ ] added or updated version, license, or notice information in [licenses.yaml](https://github.com/apache/druid/blob/master/dev/license.md) - [x] added comments explaining the "why" and the intent of the code wherever would not be obvious for an unfamiliar reader. - [x] added unit tests or modified existing tests to cover new code paths, ensuring the threshold for [code coverage](https://github.com/apache/druid/blob/master/dev/code-review/code-coverage.md) is met. - [x] added integration tests. - [x] been tested in a test Druid cluster. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
