This is an automated email from the ASF dual-hosted git repository.
chrispeck pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new 7c3d24ee57 [multistage] Fix Bug in MailboxInfo Ordering (#16224)
7c3d24ee57 is described below
commit 7c3d24ee57dceb5b086e2495f80ce284276047b2
Author: Ankit Sultana <[email protected]>
AuthorDate: Mon Jun 30 11:59:06 2025 -0500
[multistage] Fix Bug in MailboxInfo Ordering (#16224)
---
.../planner/physical/v2/PlanFragmentAndMailboxAssignment.java | 9 +++++++++
1 file changed, 9 insertions(+)
diff --git
a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/physical/v2/PlanFragmentAndMailboxAssignment.java
b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/physical/v2/PlanFragmentAndMailboxAssignment.java
index abaa363daf..bfc200981d 100644
---
a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/physical/v2/PlanFragmentAndMailboxAssignment.java
+++
b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/physical/v2/PlanFragmentAndMailboxAssignment.java
@@ -21,6 +21,7 @@ package org.apache.pinot.query.planner.physical.v2;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableList;
import java.util.ArrayList;
+import java.util.Comparator;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@@ -253,6 +254,14 @@ public class PlanFragmentAndMailboxAssignment {
for (var entry : workersByUniqueHostPort.entrySet()) {
result.add(new MailboxInfo(entry.getKey().getHostname(),
entry.getKey().getQueryMailboxPort(), entry.getValue()));
}
+ // IMP: Return mailbox info sorted by workerIds. This is because
SendingMailbox are created in this order, and
+ // record assignment for hash exchange follows modulo arithmetic. e.g. if
we have sending mailbox in order:
+ // [worker-1, worker-0], then records with modulo 0 hash would end up in
worker-1.
+ // Note that the workerIds list will be >1 in length only when there's a
parallelism change. It's important to
+ // also know that MailboxSendOperator will iterate over this
List<MailboxInfo> in order, and within each iteration
+ // iterate over all the workerIds of that MailboxInfo. The result
List<SendingMailbox> is used for modulo
+ // arithmetic for any partitioning exchange strategy.
+ result.sort(Comparator.comparingInt(info -> info.getWorkerIds().get(0)));
return result;
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]