LakshSingla commented on PR #15420: URL: https://github.com/apache/druid/pull/15420#issuecomment-1970631403
@gianm With the recent changes made to the PR, I wanted to revisit the questions posed [here](https://github.com/apache/druid/pull/15420#issuecomment-1878164205), and more, that will help in reviewing. **Assumptions** There's an attempt to link various places where the merge buffers are acquired (`mergeResults`) and merge buffers are utilized (`mergeResults` and `mergeRunners`). However, Druid's code doesn't provide any explicit contract between the arguments of these methods, and input to `mergeResults` can be any runner, and it should function the same. While this provides flexibility and reusability to the methods, this also necessitates that there are some assumptions that the code makes implicitly, to know what type of runner is passed to `mergeResults`: 1. For a given query, and a given server, only a single top-level `mergeResults` call will be made, that will collect the results from the various runners. The code will break down if there are multiple, nested `mergeResults` calls made (unnested calls are fine, though they don't happen) 2. There can be multiple `mergeRunners`, because `GroupByMergingQueryRunner` only needs the merge buffers for the top-level query runner, nested ones execute via an unoptimized way. 3. There's some knowledge to the `mergeResults` that the query runner passed to it is the one created by the corresponding toolchest's `mergeRunners` (which is the typical use case). This is encoded in the argument `willMergeRunner`, and is to be set by the callers. The only production use case where this isn't true is when the broker is merging the results gathered from the historical) These are true to my knowledge at the time of the PR (and they should remain true unless there's some whacky change in the query stack). Also, these assumptions need to be more valid for group-by queries, because only they require shared resources. Rest all queries don't, and they don't rely on any of these assumptions being correct. **Resource ID** Each query has a unique resource id, that is assigned to it when it enters the queryable server: * For brokers: It's the `ClientQuerySegmentWalker` * For historical: It's the `ServerManager` * For peons: It's the `SinkQuerySegmentWalker` These three classes are one of the first places the query reaches when it begins processing, therefore it is guaranteed that if the resource id is allotted at only these places, no one will overwrite the resource id during the execution. Note: Historicals and Peons could have used the same query id allotted by the brokers, however they assign their own because: a) The user can directly choose to query the data server (while debugging etc) b) UNIONs are treated as multiple separate queries when the broker sends them to the historicals. Therefore we require a unique id for each part of the union, and hence we need to reassign the resource id to the query's part, or else they'll end up sharing the same resource ID. **Tests modifications** This section lays out the modifications made to the test cases With the assumptions laid out, we need to modify the tests, because they try to mimic the broker-historical interaction in many places, which can lead to the code not working as intended because the assumptions don't hold. For example, in many test cases, there are two nested `mergeResults` calls, the outer call mimics what the broker does, while the inner one mimics what the historical does. Therefore, we need to assign a unique resource id b/w each `mergeResults` call, and also make sure that that the top level mergeResults would have `willMergeRunner = false`, since it's being called on top of a mergeResults's runner, while the inner one would have `willMergeRunner = true` because its being called on actual runners. **Merge buffer allocation** The merge buffers are allocated and associated with a given resource id in the global pool `GroupByResourcesReservationPool`. Multiple attempts to insert the same resource id will fail, therefore we know that there will only be resources allocated only once, as long as the query id doesn't change during the execution of the query. The pool is cleaned once `close()` is called on the reserved resources, and the mapping is removed, thus ensuring that the mapping doesn't keep growing during the execution of the queries. The call to allocate the merge buffers in the pool is done by `mergeResults`, and it allocates the resources required for it's execution as well as the execution of the `GroupByMergingQueryRunner` if `willMergeRunners=true`. The `GroupByMergingQueryRunner` doesn't allocate any resources, it assumes that the resources have been preallocated, and just takes them from the pool. Once the required merge buffers are allocated from the pool, they cannot be used by the other queries till the `close()` method is called on the `GroupByQueryResource`. This is usually done with a call to the `GroupByResourcesReservationPool#clean()` which does this and also cleans up the mapping. While the `GroupByQueryResource` is unclosed, the merge buffers can be taken and given back to it as needed during the execution of the query. As such, the resources are not released back to the global pool, and only given back to signify that the work of that execution unit is complete and it can be reused (or closed safely). Closing the `GroupByQueryResources` when all the merge buffers are not acquired back from the individual execution units log a warning, but doesn't throw. The resources get freed up, and if the execution unit was actually using the resources for something, it can error out. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
