liran-funaro opened a new pull request #10001: URL: https://github.com/apache/druid/pull/10001
Fixes [#9967](https://github.com/apache/druid/issues/9967). ### Description This PR improves Druid’s ingestion memory and CPU efficiency. It uses **60%** less memory and **50%** less CPU-time to achieve the same performance. This translated to nearly **doubles** the system's ingestion-throughput with the same memory budget, and a **75%** increase in throughput with the same CPU-time budget. To understand the motivation and rationale behind some of the proposed changes below, it is necessary to read the related issue: [#9967](https://github.com/apache/druid/issues/9967). The code modifications we proposed are organized in four commits: 1. [``0af1720``](https://github.com/liran-funaro/druid/commit/0af17207f47089f14b305abd335999e4535f17ff) Introduce `OakIncrementalIndex` 1. [``a6777be``](https://github.com/liran-funaro/druid/commit/a6777be8eaaec6a0168488f3a9ff8ca52feb380f) Add **test** cases for `OakIncrementalIndex` 1. [``95b0b81``](https://github.com/liran-funaro/druid/commit/95b0b81edd8e14b2b19c37825cc042b3a98c0572) Add **benchmark** cases for `OakIncrementalIndex` 1. [``eed0606``](https://github.com/liran-funaro/druid/commit/eed0606a67f89b653e8cc402eb0fe36af511de30) Add `incrementalIndexType` ingestion configuration knob #### Introduce `OakIncrementalIndex` We add a new incremental index implementation: `OakIncrementalIndex`. The implementation is mostly borrowed from `OnheapIncrementalIndex` and `OffheapIncrementalIndex`, but has a few notable differences: 1. It stores both **keys** and **values** off-heap (as opposed to the off-heap implementation that stores only the **values** off-heap) 1. It is based on [`OakMap`](https://github.com/yahoo/Oak) instead of Java’s `ConcurrentSkipList` (CSL) 1. It does not need to keep a mapping from row index to an actual row 1. It is always ordered (as expected by `FactsHolder.persistIterable()`), even in plain mode To achieve the best performance of our implementation, we had to make some modifications to the interfaces of `IncrementalIndexRow` and `IncrementalIndex`. We modified other places in the code to fit these new interfaces. The modifications to `IncrementalIndex` includes: 1. A generic index builder: `public IncrementalIndex build(String incrementalIndexType)`. It accepts one of three strings: “onheap”, “offheap” and “oak”, and will instantiate a new incremental index accordingly. 1. Changed the `getMetric` methods to accept `IncrementalIndexRow` instead of `int rowOffset`, because `OakIncrementalIndex` does not keep a mapping from row index to an actual row. This does not affect the other implementations performance because in all the cases these methods are used, the caller already had an `IncrementalIndexRow` object. The modifications to `IncrementalIndexRow` allow lazy evaluation of off-heap keys, without adding an overhead to the on-heap keys case. We add/modify the following methods: 1. Changed `public Object[] getDims()` to `public Object getDim(int index)` 1. Add `public int getDimsLength()` 1. Add `public boolean isDimNull(int index)` 1. Add `public IndexedInts getStringDim(final int dimIndex)` `getStringDim(final int dimIndex)` purpose is to generate a lazy-evaluation version of a string dimension instead of the array of integers that is returned by `getDim(int index)`. The modified implementation in `StringDimensionIndexer` and `IncrementalIndexAdapter`, first try to fetch the string dimension using this method, and if it returns `null`, they fall back to using the integer array. This allows our implementation (`OakIncrementalIndex`) to use the lazy-evaluation approach, while the other implementations return `null` and use their existing integer array directly, without performance degradation. ##### Key changed/added classes in this commit * Changed: - `IncrementalIndex` - `IncrementalIndexRow` - `StringDimensionIndexer` - `IncrementalIndexAdapter` * Added: - `OakIncrementalIndex`: follows the `IncrementalIndex` API - `OakIncrementalIndexRow`: follows the `IncrementalIndexRow` API - `OakKey`: handles the serialization, deserialization, and comparison of keys - `OakValueSerializer`: handles the initialization of the aggregators of a new inserted row #### Add test cases for `OakIncrementalIndex` We modified all the tests that are relevant to the incremental index. The modifications include the parametrization of the tests for all incremental-index implementations: on-heap, off-heap, and Oak. #### Add benchmark cases for `OakIncrementalIndex` We added to all the benchmarks that are relevant to the incremental index, an incremental-index parametrization: on-heap, off-heap, or Oak. In addition, we modified these benchmarks to resolve some issues we encountered. <details> <summary>We list here the additional modifications we made to some of the benchmarks.</summary> * Add some additional parametrization: - rollup opportunity for the row generator - number of rows per segment - query order: descending/ascending * Modify the parametrization of the rollup opportunity for the row generator - From implicit names (`{"none", "moderate", "high"}`) to explicit number of repeated timestamps (`{"0", "1000", "10000"}`) * Add a missing `tearDown()` procedure * Properly close the queryable index in the `tearDown()` procedure * Moved any temporary folder creation and deletion to the `setup()`/`tearDown()` methods so they would not affect the measurements of the results * Use a predefined seed for reproducible results, to be compliant with most benchmarks * Add scopes (`@State(Scope.Benchmark)`) that allow us to test the incremental index without the overhead of the setup procedure of the queryable index benchmark - One scope for benchmarking queries on the incremental index - One scope for benchmarking queries on the queryable index </details> #### Add `incrementalIndexType` ingestion configuration knob * Add a method to the `AppenderatorConfig` interface: `String getIncrementalIndexType()` and update all the implementations and tests accordingly * Add a parameter to `Sink`: `String incrementalIndexType` and build the incremental index accordingly * Update `RealtimePlumber` to instantiate `Sink` with the additional `incrementalIndexType` parameter that is taken from the configuration <hr> This PR has: - [X] been self-reviewed. - [X] added documentation for new or modified features or behaviors. - [X] added Javadocs for most classes and all non-trivial methods. Linked related entities via Javadoc links. - [ ] added or updated version, license, or notice information in [licenses.yaml](https://github.com/apache/druid/blob/master/licenses.yaml) - [X] added comments explaining the "why" and the intent of the code wherever it would not be obvious for an unfamiliar reader. - [X] added unit tests or modified existing tests to cover new code paths. - [ ] added integration tests. - [X] been tested in a test Druid cluster. ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [email protected] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
