liran-funaro opened a new pull request #10001:
URL: https://github.com/apache/druid/pull/10001


   Fixes [#9967](https://github.com/apache/druid/issues/9967).
   
   ### Description
   This PR improves Druid’s ingestion memory and CPU efficiency. It uses 
**60%** less memory and **50%** less CPU-time to achieve the same performance. 
This translated to nearly **doubles** the system's ingestion-throughput with 
the same memory budget, and a **75%** increase in throughput with the same 
CPU-time budget.
   
   To understand the motivation and rationale behind some of the proposed 
changes below, it is necessary to read the related issue: 
[#9967](https://github.com/apache/druid/issues/9967).
   
   The code modifications we proposed are organized in four commits:
     1. 
[``0af1720``](https://github.com/liran-funaro/druid/commit/0af17207f47089f14b305abd335999e4535f17ff)
 Introduce `OakIncrementalIndex`
     1. 
[``a6777be``](https://github.com/liran-funaro/druid/commit/a6777be8eaaec6a0168488f3a9ff8ca52feb380f)
 Add **test** cases for `OakIncrementalIndex`
     1. 
[``95b0b81``](https://github.com/liran-funaro/druid/commit/95b0b81edd8e14b2b19c37825cc042b3a98c0572)
 Add **benchmark** cases for `OakIncrementalIndex`
     1. 
[``eed0606``](https://github.com/liran-funaro/druid/commit/eed0606a67f89b653e8cc402eb0fe36af511de30)
 Add `incrementalIndexType` ingestion configuration knob
   
   #### Introduce `OakIncrementalIndex`
   We add a new incremental index implementation: `OakIncrementalIndex`. The 
implementation is mostly borrowed from `OnheapIncrementalIndex` and 
`OffheapIncrementalIndex`, but has a few notable differences:
     1. It stores both **keys** and **values** off-heap (as opposed to the 
off-heap implementation that stores only the **values** off-heap)
     1. It is based on [`OakMap`](https://github.com/yahoo/Oak) instead of 
Java’s `ConcurrentSkipList` (CSL)
     1. It does not need to keep a mapping from row index to an actual row
     1. It is always ordered (as expected by `FactsHolder.persistIterable()`), 
even in plain mode
    
   To achieve the best performance of our implementation, we had to make some 
modifications to the interfaces of `IncrementalIndexRow` and 
`IncrementalIndex`. We modified other places in the code to fit these new 
interfaces.
   
   The modifications to `IncrementalIndex` includes:
     1. A generic index builder: `public IncrementalIndex build(String 
incrementalIndexType)`.
   It accepts one of three strings: “onheap”, “offheap” and “oak”, and will 
instantiate a new incremental index accordingly.
     1. Changed the `getMetric` methods to accept `IncrementalIndexRow` instead 
of `int rowOffset`, because `OakIncrementalIndex` does not keep a mapping from 
row index to an actual row.
   This does not affect the other implementations performance because in all 
the cases these methods are used, the caller already had an 
`IncrementalIndexRow` object.
   
   The modifications to `IncrementalIndexRow` allow lazy evaluation of off-heap 
keys, without adding an overhead to the on-heap keys case. We add/modify the 
following methods:
     1. Changed `public Object[] getDims()` to `public Object getDim(int index)`
     1. Add `public int getDimsLength()`
     1. Add `public boolean isDimNull(int index)`
     1. Add `public IndexedInts getStringDim(final int dimIndex)`
   
   `getStringDim(final int dimIndex)` purpose is to generate a lazy-evaluation 
version of a string dimension instead of the array of integers that is returned 
by `getDim(int index)`.
   The modified implementation in `StringDimensionIndexer` and 
`IncrementalIndexAdapter`, first try to fetch the string dimension using this 
method, and if it returns `null`, they fall back to using the integer array. 
This allows our implementation (`OakIncrementalIndex`) to use the 
lazy-evaluation approach, while the other implementations return `null` and use 
their existing integer array directly, without performance degradation.
   
   ##### Key changed/added classes in this commit
    * Changed:
      - `IncrementalIndex`
      - `IncrementalIndexRow`
      - `StringDimensionIndexer`
      - `IncrementalIndexAdapter`
    * Added:
      - `OakIncrementalIndex`: follows the `IncrementalIndex` API
      - `OakIncrementalIndexRow`: follows the `IncrementalIndexRow` API
      - `OakKey`: handles the serialization, deserialization, and comparison of 
keys
      - `OakValueSerializer`: handles the initialization of the aggregators of 
a new inserted row
   
   #### Add test cases for `OakIncrementalIndex`
   We modified all the tests that are relevant to the incremental index.
   The modifications include the parametrization of the tests for all 
incremental-index implementations: on-heap, off-heap, and Oak.
   
   #### Add benchmark cases for `OakIncrementalIndex`
   We added to all the benchmarks that are relevant to the incremental index, 
an incremental-index parametrization: on-heap, off-heap, or Oak.
   In addition, we modified these benchmarks to resolve some issues we 
encountered.
   
   <details>
   <summary>We list here the additional modifications we made to some of the 
benchmarks.</summary>
   
     * Add some additional parametrization:
        - rollup opportunity for the row generator
        - number of rows per segment
        - query order: descending/ascending
     * Modify the parametrization of the rollup opportunity for the row 
generator
        - From implicit names (`{"none", "moderate", "high"}`) to explicit 
number of repeated timestamps (`{"0", "1000", "10000"}`)
     * Add a missing `tearDown()` procedure
     * Properly close the queryable index in the `tearDown()` procedure
     * Moved any temporary folder creation and deletion to the 
`setup()`/`tearDown()` methods so they would not affect the measurements of the 
results
     * Use a predefined seed for reproducible results, to be compliant with 
most benchmarks
     * Add scopes (`@State(Scope.Benchmark)`) that allow us to test the 
incremental index without the overhead of the setup procedure of the queryable 
index benchmark
        - One scope for benchmarking queries on the incremental index
        - One scope for benchmarking queries on the queryable index
   </details>
   
   #### Add `incrementalIndexType` ingestion configuration knob
     * Add a method to the `AppenderatorConfig` interface: `String 
getIncrementalIndexType()` and update all the implementations and tests 
accordingly
     * Add a parameter to `Sink`: `String incrementalIndexType` and build the 
incremental index accordingly
     * Update `RealtimePlumber` to instantiate `Sink` with the additional  
`incrementalIndexType` parameter that is taken from the configuration
   
   <hr>
   
   This PR has:
   - [X] been self-reviewed.
   - [X] added documentation for new or modified features or behaviors.
   - [X] added Javadocs for most classes and all non-trivial methods. Linked 
related entities via Javadoc links.
   - [ ] added or updated version, license, or notice information in 
[licenses.yaml](https://github.com/apache/druid/blob/master/licenses.yaml)
   - [X] added comments explaining the "why" and the intent of the code 
wherever it would not be obvious for an unfamiliar reader.
   - [X] added unit tests or modified existing tests to cover new code paths.
   - [ ] added integration tests.
   - [X] been tested in a test Druid cluster.
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to