Akshat-Jain opened a new pull request, #16481:
URL: https://github.com/apache/druid/pull/16481

   ## Description
   
   This PR optimises S3 storage writing for MSQ durable storage by uploading 
the chunks to S3 in a separate threadpool.
   
   Currently, we were creating and uploading a chunk to S3 sequentially, which 
isn't optimal.
   
   This PR changes the logic such that the creation of chunks still happen in 
the same processing threadpool, but the uploading of chunks is offloaded to a 
separate threadpool.
   
   ## Test Plan
   
   1. For every operation, I verified the number of rows returned with this 
PR's changes match the number of rows returned without this PR's changes.
   2. For every operation, I verified the output files and their sizes (in S3) 
with this PR's changes match the corresponding files uploaded without this PR's 
changes.
   3. TODO: Need to add/update tests.
   
   ## Timing Data
   
   Following section summarizes the query duration comparison for a few 
different queries. We compare the original duration with the new duration 
across a few combinations of parameters.
   
   Note:
   1. `trips_xaa` is one of the sample datasources with 60M rows.
   3. All timing durations mentioned below are averages of 3 data points.
   4. All tasks used the following query context, unless a different value is 
explicitly mentioned in the `Other Notes` column:
   ```json
   {
     "maxNumTasks": 5,
     "durableShuffleStorage": true,
     "selectDestination": "durableStorage",
     "rowsPerPage": 10000000
   }
   ```
   
   ### Query 1
   ```sql
   select trip_id, sum(tip_amount) from "trips_xaa" group by trip_id
   ```
   
   Original timings:
   
   | Chunk Size | Time               | Other Notes          |
   | :--------- | :-------------- |  :-------------- | 
   | 100 MB     | 10:31 minutes   | |
   | 5 MB         | 10:51 minutes | |
   | 5 MB         | 9:53 minutes    | maxNumTasks = 2 |
   
   New timings:
   
   | Chunk Size | Threadpool size | Max concurrent chunks | Time          |
   | :--------- | :-------------- | :-------------------- | :------------ |
   | 100 MB     | 5               | 5                     | 8:56 minutes  |
   | 5 MB       | 5               | 5                     | 10:26 minutes |
   | 5 MB       | 10              | 10                    | 7:35 minutes  |
   | 5 MB       | 30              | 30                    | 6:52 minutes  |
   
   
   ### Query 2
   ```sql
   SELECT sum("commentLength"), "countryName" FROM "wikipedia_s3" group by 
"countryName" limit 4
   ```
   
   Original timings:
   
   | Chunk Size | Time               | 
   | :--------- | :-------------- | 
   | 100 MB     |  32 seconds | 
   | 5 MB         |  29 seconds | 
   
   New timings:
   
   | Chunk Size | Threadpool size | Max concurrent chunks | Time       |
   | :--------- | :-------------- | :-------------------- | :--------- |
   | 100 MB     | 5               | 5                     | 31 seconds |
   | 5 MB       | 5               | 5                     | 28 seconds |
   | 5 MB       | 10              | 10                    | 28 seconds |
   
   
   ### Query 3
   ```sql
   SELECT "trip_id", sum(tip_amount) FROM "trips_xaa" where "fare_amount" > 100 
group by "trip_id", "tip_amount" order by "trip_id"
   ```
   
   Original timings:
   
   | Chunk Size | Time               | 
   | :--------- | :-------------- | 
   | 100 MB     | 6:22 minutes | 
   | 5 MB         | 4:56 minutes |
   
   New timings:
   
   | Chunk Size | Threadpool size | Max concurrent chunks | Time         |
   | :--------- | :-------------- | :-------------------- | :----------- |
   | 100 MB     | 5               | 5                     | 5:55 minutes |
   | 5 MB       | 5               | 5                     | 4:55 minutes |
   | 5 MB       | 10              | 10                    | 4:30 minutes |
   
   
   ### Query 4
   ```sql
   select trip_id, sum(tip_amount) from "trips_xaa" where __time < TIMESTAMP 
'2014-04-06 11:15:25' group by trip_id
   ```
   
   Original timings:
   
   | Chunk Size | Time               | 
   | :--------- | :-------------- | 
   | 100 MB     | 1:56 minutes | 
   | 5 MB         | 1:52 minutes | 
   
   New timings:
   
   | Chunk Size | Threadpool size | Max concurrent chunks | Time         |
   | :--------- | :-------------- | :-------------------- | :----------- |
   | 100 MB     | 5               | 5                     | 1:45 minutes |
   | 5 MB       | 5               | 5                     | 1:45 minutes |
   | 5 MB       | 10              | 10                    | 1:44 minutes |
   
   
   ### Query 5
   ```sql
   select * from "trips_xaa" where __time < TIMESTAMP '2013-11-06 11:15:25'
   ```
   
   The optimization is much more significant when we have I/O bound queries 
like this one with durable storage enabled for storing intermediary stage 
outputs as well as the query results. This query returned 56.8 MB of data.
   
   Original timings:
   
   | Chunk Size | Time               | Other Notes          |
   | :--------- | :-------------- |  :-------------- | 
   | 5 MB         | 2:11 minutes | maxNumTasks = 2 |
   
   New timings:
   
   | Chunk Size | Threadpool size | Max concurrent chunks | Time         | 
Other Notes          |
   | :--------- | :-------------- | :-------------------- | :----------- |  
:-------------- | 
   | 5 MB       | 10              | 10                    | 56 seconds | 
maxNumTasks = 2 |
   
   
   ### Query 6
   ```sql
   select * from "trips_xaa" where __time < TIMESTAMP '2014-02-06 11:15:25'
   ```
   
   This is another I/O bound query like the previous one. This query returned 
454 MB of data.
   
   Original timings:
   
   | Chunk Size | Time               | Other Notes          |
   | :--------- | :-------------- |  :-------------- | 
   | 5 MB         | 13:50 minutes | maxNumTasks = 2 |
   
   New timings:
   
   | Chunk Size | Threadpool size | Max concurrent chunks | Time         | 
Other Notes          |
   | :--------- | :-------------- | :-------------------- | :----------- |  
:-------------- | 
   | 5 MB       | 10              | 10                    | 5:08 minutes | 
maxNumTasks = 2 |
   
   
   This PR has:
   
   - [x] been self-reviewed.
   - [ ] added documentation for new or modified features or behaviors.
   - [ ] a release note entry in the PR description.
   - [ ] added Javadocs for most classes and all non-trivial methods. Linked 
related entities via Javadoc links.
   - [ ] added or updated version, license, or notice information in 
[licenses.yaml](https://github.com/apache/druid/blob/master/dev/license.md)
   - [ ] added comments explaining the "why" and the intent of the code 
wherever would not be obvious for an unfamiliar reader.
   - [ ] added unit tests or modified existing tests to cover new code paths, 
ensuring the threshold for [code 
coverage](https://github.com/apache/druid/blob/master/dev/code-review/code-coverage.md)
 is met.
   - [ ] added integration tests.
   - [ ] been tested in a test Druid cluster.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to