yashmayya opened a new pull request, #17323:
URL: https://github.com/apache/pinot/pull/17323

   - This is an important bugfix for the multi-stage query engine. Currently, 
for certain kinds of join queries, we can see incorrect results intermittently.
   - For instance, if two inputs to a join are both grouping aggregates where 
the group keys are the same as the join keys, the MSE query planner infers that 
we can do a "pre-partitioned" send where workers are connected between the 
stages 1:1 (if no parallelism), instead of doing a full partitioned shuffle 
(see 
[PinotRelDistributionTraitRule](https://github.com/apache/pinot/blob/43142cdc375e805288fd911c08e5d7b22177fa2d/pinot-query-planner/src/main/java/org/apache/pinot/calcite/rel/rules/PinotRelDistributionTraitRule.java#L63),
 
[RelToPlanNodeConverter](https://github.com/apache/pinot/blob/43142cdc375e805288fd911c08e5d7b22177fa2d/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/logical/RelToPlanNodeConverter.java#L192-L195),
 
[MailboxAssignmentVisitor](https://github.com/apache/pinot/blob/43142cdc375e805288fd911c08e5d7b22177fa2d/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/physical/MailboxAssignmentVisitor.java#L94-L124)).
   - An important assumption for the above to work correctly is that the same 
sender worker ID corresponds to the same partition / hash bucket for the 
grouping keys (which are also the join keys in the downstream stage). However, 
this is a fragile assumption with the current state of things and relies on 
non-deterministic `HashMap` iteration during worker assignment at various 
places in 
[WorkerManager](https://github.com/apache/pinot/blob/43142cdc375e805288fd911c08e5d7b22177fa2d/pinot-query-planner/src/main/java/org/apache/pinot/query/routing/WorkerManager.java#L106).
   - The fix here is simple - we ensure that the mailbox list used for data 
routing in 
[HashExchange](https://github.com/apache/pinot/blob/43142cdc375e805288fd911c08e5d7b22177fa2d/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/exchange/HashExchange.java#L55-L78)
 is sorted by the receiver worker ID. Note that the v2 physical planner already 
does something similar 
[here](https://github.com/apache/pinot/blob/43142cdc375e805288fd911c08e5d7b22177fa2d/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/physical/v2/PlanFragmentAndMailboxAssignment.java#L256-L263)
 but the difference is that it is sorting the `MailboxInfo` list directly which 
can have multiple workers assigned per server depending on the parallelism (and 
it assumes that the workers assigned to a server are contiguous, hence sorts 
simply on the first worker).
   - With the fix here, it's now guaranteed that the same partitions / hash 
buckets will land on workers with the same IDs on all branches in a query plan 
(when the number of workers is the same), thus making pre-partitioned sends 
deterministically correct. Note that the underlying physical servers can be 
different and that's okay here, since we're simply doing a pre-partitioned send 
and not assuming full colocation.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to