kfaraz opened a new pull request #12331:
URL: https://github.com/apache/druid/pull/12331


   ### Description
   Parallel indexing with range partitioning can often cause OOM in the
   `ParallelIndexSupervisorTask` during the dimension distribution phase.
   This typically happens because of too many `StringSketch` objects
   obtained from the different `partial_dimension_distribution` sub-tasks.
   
   We can reduce the number of sketches in memory by merging them to the 
existing distribution for
   the respective interval.
   
   ### Changes
   - Extract `StringDistribution` from `DimensionDistributionReport`s when they 
are received and merge to the existing `StringDistributionMerger`
   - Add size estimation in `StringTuple` and `StringDistribution`
   - Check the estimated size whenever a new sketch is added to memory
   - Fail the task if the estimated size exceeds the amount of heap space 
allocated for reports
   
   ### Comparison
   For an example dataset where each sub-task processed about 1 GB of data for 
the same interval, the heap usage of the `ParallelIndexSupervisorTask` is 
typically as below.
   
   __Before__
   <img width="874" alt="before" 
src="https://user-images.githubusercontent.com/18635897/158237039-117ea10d-646e-415f-b0c8-c03aaa25a7f7.png";>
   
   __After__
   <img width="875" alt="after" 
src="https://user-images.githubusercontent.com/18635897/158237205-09c9351a-f449-44b6-bf09-d0adb9464c7f.png";>
   
   
   ### Other possible approaches
   (a) Write the merged distribution for a given interval to disk and read back 
only when required.
   - Possible future work?
   - Need to evaluate impact of the increased disk I/O on the execution of the 
`ParallelIndexSupervisorTask`
   
   (b) Streaming merge
   - Sub-tasks send back reports in the order of their intervals
   - This would be most effective if the input dataset is sorted by time
   - Sub-tasks would then stream their reports to the supervisor task in the 
right order of intervals.
   -  The supervisor task needs to keep in memory only a single merged 
distribution for the current interval being processed, thus completely 
eliminating the memory pressure.
   - Relatively more involved implementation
   
   This PR has:
   - [ ] been self-reviewed.
      - [ ] using the [concurrency 
checklist](https://github.com/apache/druid/blob/master/dev/code-review/concurrency.md)
 (Remove this item if the PR doesn't have any relation to concurrency.)
   - [ ] added documentation for new or modified features or behaviors.
   - [x] added Javadocs for most classes and all non-trivial methods. Linked 
related entities via Javadoc links.
   - [ ] added or updated version, license, or notice information in 
[licenses.yaml](https://github.com/apache/druid/blob/master/dev/license.md)
   - [x] added comments explaining the "why" and the intent of the code 
wherever would not be obvious for an unfamiliar reader.
   - [ ] added unit tests or modified existing tests to cover new code paths, 
ensuring the threshold for [code 
coverage](https://github.com/apache/druid/blob/master/dev/code-review/code-coverage.md)
 is met.
   - [ ] added integration tests.
   - [x] been tested in a test Druid cluster.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to