yashmayya opened a new pull request, #17323: URL: https://github.com/apache/pinot/pull/17323
- This is an important bugfix for the multi-stage query engine. Currently, for certain kinds of join queries, we can see incorrect results intermittently. - For instance, if two inputs to a join are both grouping aggregates where the group keys are the same as the join keys, the MSE query planner infers that we can do a "pre-partitioned" send where workers are connected between the stages 1:1 (if no parallelism), instead of doing a full partitioned shuffle (see [PinotRelDistributionTraitRule](https://github.com/apache/pinot/blob/43142cdc375e805288fd911c08e5d7b22177fa2d/pinot-query-planner/src/main/java/org/apache/pinot/calcite/rel/rules/PinotRelDistributionTraitRule.java#L63), [RelToPlanNodeConverter](https://github.com/apache/pinot/blob/43142cdc375e805288fd911c08e5d7b22177fa2d/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/logical/RelToPlanNodeConverter.java#L192-L195), [MailboxAssignmentVisitor](https://github.com/apache/pinot/blob/43142cdc375e805288fd911c08e5d7b22177fa2d/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/physical/MailboxAssignmentVisitor.java#L94-L124)). - An important assumption for the above to work correctly is that the same sender worker ID corresponds to the same partition / hash bucket for the grouping keys (which are also the join keys in the downstream stage). However, this is a fragile assumption with the current state of things and relies on non-deterministic `HashMap` iteration during worker assignment at various places in [WorkerManager](https://github.com/apache/pinot/blob/43142cdc375e805288fd911c08e5d7b22177fa2d/pinot-query-planner/src/main/java/org/apache/pinot/query/routing/WorkerManager.java#L106). - The fix here is simple - we ensure that the mailbox list used for data routing in [HashExchange](https://github.com/apache/pinot/blob/43142cdc375e805288fd911c08e5d7b22177fa2d/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/exchange/HashExchange.java#L55-L78) is sorted by the receiver worker ID. Note that the v2 physical planner already does something similar [here](https://github.com/apache/pinot/blob/43142cdc375e805288fd911c08e5d7b22177fa2d/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/physical/v2/PlanFragmentAndMailboxAssignment.java#L256-L263) but the difference is that it is sorting the `MailboxInfo` list directly which can have multiple workers assigned per server depending on the parallelism (and it assumes that the workers assigned to a server are contiguous, hence sorts simply on the first worker). - With the fix here, it's now guaranteed that the same partitions / hash buckets will land on workers with the same IDs on all branches in a query plan (when the number of workers is the same), thus making pre-partitioned sends deterministically correct. Note that the underlying physical servers can be different and that's okay here, since we're simply doing a pre-partitioned send and not assuming full colocation. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
