jasonk000 opened a new pull request #12303:
URL: https://github.com/apache/druid/pull/12303


   ### Description
   
   Introduce parallel parsing to the stream ingestion service. This is slightly 
different as it allows each "indexing" task to run parse and transform stages 
across multiple threads and feed them into a single appenderator.
   
   This allows to increase throughput of each task, which has a number of 
benefits: It allows us to reduce overall task counts for a given number of 
Kafka partitions, allows for a better response of each task to short periods of 
peaks and dips in traffic by more effectively sharing resources across 
partitions.
   
   This is *similar, but different*, to 
https://github.com/apache/druid/issues/7900:
   - In 7900, the Indexer creates a number of threads and each threaded task 
operates as independent within the process rather than a forked process, but 
operates as it did before in terms of per-task throughput characteristics.
   - In this PR, each task can operate parsing across multiple threads so the 
tasks themselves can handle more messages in a single task.
   
   Configurable:
   - Introduce a new parameter `parsingThreadCount` which can be set on the 
kafka tuning config, which configures how many threads will be used to parse 
and apply transforms to records loaded from streaming datasource.
   - If set, the parameter creates a new `parserThreadPool` which will be used 
for parse & transform operations.
   - If no value (`null`), or `1` is used, the existing in-thread behavior 
occurs (nb: the time of the parse&transform is moved conceptually earlier in 
the processing).
   - If `-1`, or `0` or some other negative value is provided, then automatic 
selection of number of processors is used based on 
`Runtime.getRuntime().availableProcessors()`.
   - If a value `1 or more` is used, then that will be the size of the thread 
pool.
   
   To support this:
   - Introduce a new `ParseResult` which, combined with `Pair` and `List` can 
capture the full parse results with any possible errors, including tracking 
whether the result is thrown away, so that all meters are updated correctly.
   - Introduce a `CachingInputRow` that can be used to perform the 
transformation of the contents of a row on the parallel threads.
   - Ensure `TransformSpec::toTransformer` is performed only once and reused 
(since the input is immutable and a `Transformer` is expensive to create).
   - Update `kafka-supervisor-reference.md` with new parameter 
`parsingThreadCount`
   - Extend `KafkaIndexTest` to test with varying combinations of 
`parsingThreadCount`
   
   ### Discussion
   
   - I opted to focus only on the parse & load stage. In order to avoid 
changing the metric counters etc, during parse stage this collects all possible 
parse results. Later, immediately prior to append, the metrics will be updated 
for the row. This ensures all of the existing statistics behavior and error 
handling is performed exactly as today. It's not clear to me how important the 
meter data is, and if it is not deemed important we could simplify the result 
handling and processing. 
   - In our cluster, the "serial" part of work, split between `getRecords` and 
`Appenderator.add()` takes about 50pc of the task effort, so Amdah's law limits 
this to a ~1.6x speedup for 4 threads. Since we allocate multiple threads to a 
task anyway (for GC, queries, etc), being able to use these 4 extra threads 
provides "free" extra throughput. Further work here on the appenderator would 
allow significant increases in the boost here.
   
   ### Future work
   
   - It would be interesting to combine this with #7900 ; in theory it should 
be possible to share even the parse thread pool across multiple tasks so that 
resources are automatically available.
   - The appenderator is currently marked as single-caller-only so does not 
support parallel `add()` calls, this enforces a natural Amdah's Law limit to 
the scalability. I will have future PRs that should reduce the cost of the 
appenderator add call. In an ideal world, it would be possible to create a 
concurrent appenderator which would mean the entire task could parse across 
many threads.
   
   ##### Key changed/added classes in this PR
    * `extensions-core/kafka-indexing-service/*` to handle the new tuning 
parameter
    * `extensions-core/kinesis-indexing-service/*` to handle the new tuning 
parameter
    * `core/../CachingInputRow` introduced to allow complete pre-instantiation 
of a row
    * `StreamChunkParser` returns a `ParseResult` which embeds the parsing 
output as well as the status (such as thrown_away)
    * `SeekableStreamIndexTaskRunner` (and related) to accept the new 
parameter, and a new function `mapRowBlocks` that performs the logic of 
transforming input record blocks to a parsed block of records ready for 
appending.
    * `TransformSpec` is modified to allow it to instantiate a `Transformer` 
only once.
   
   This PR has:
   - [x] been self-reviewed.
      - [x] using the [concurrency 
checklist](https://github.com/apache/druid/blob/master/dev/code-review/concurrency.md)
 (Remove this item if the PR doesn't have any relation to concurrency.)
   - [x] added documentation for new or modified features or behaviors.
   - [x] added Javadocs for most classes and all non-trivial methods. Linked 
related entities via Javadoc links.
   - [ ] added or updated version, license, or notice information in 
[licenses.yaml](https://github.com/apache/druid/blob/master/dev/license.md)
   - [x] added comments explaining the "why" and the intent of the code 
wherever would not be obvious for an unfamiliar reader.
   - [x] added unit tests or modified existing tests to cover new code paths, 
ensuring the threshold for [code 
coverage](https://github.com/apache/druid/blob/master/dev/code-review/code-coverage.md)
 is met.
   - [x] added integration tests.
   - [x] been tested in a test Druid cluster.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to