This is an automated email from the ASF dual-hosted git repository.
Jackie-Jiang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new 594428df356 Fall back to cross-server send for SINGLETON exchange when
workers diverge (#18775)
594428df356 is described below
commit 594428df3565f0efe4e29054cb65b5b2e4d3dc0f
Author: Xiaotian (Jackie) Jiang <[email protected]>
AuthorDate: Tue Jun 16 00:26:19 2026 -0700
Fall back to cross-server send for SINGLETON exchange when workers diverge
(#18775)
---
.../planner/physical/MailboxAssignmentVisitor.java | 147 +++++++++++----------
.../physical/MailboxAssignmentVisitorTest.java | 97 ++++++++++++++
2 files changed, 175 insertions(+), 69 deletions(-)
diff --git
a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/physical/MailboxAssignmentVisitor.java
b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/physical/MailboxAssignmentVisitor.java
index 233874017ae..41076e49d0e 100644
---
a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/physical/MailboxAssignmentVisitor.java
+++
b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/physical/MailboxAssignmentVisitor.java
@@ -55,84 +55,29 @@ public class MailboxAssignmentVisitor extends
DefaultPostOrderTraversalVisitor<V
int numSenders = senderServerMap.size();
int numReceivers = receiverServerMap.size();
if (sendNode.getDistributionType() == RelDistribution.Type.SINGLETON) {
- // NOTE: We use SINGLETON to represent local exchange. The actual
distribution type is determined by the
- // parallelism.
- if (numSenders == numReceivers) {
- // Send the data to the same instance (same worker id) with
SINGLETON distribution type
- for (int workerId = 0; workerId < numSenders; workerId++) {
- QueryServerInstance senderServer = senderServerMap.get(workerId);
- QueryServerInstance receiverServer =
receiverServerMap.get(workerId);
- Preconditions.checkState(senderServer.equals(receiverServer),
- "Got different server for SINGLETON distribution type for
worker id: %s, sender: %s, receiver: %s",
- workerId, senderServer, receiverServer);
- MailboxInfos mailboxInfos = new SharedMailboxInfos(
- new MailboxInfo(senderServer.getHostname(),
senderServer.getQueryMailboxPort(), List.of(workerId)));
- senderMailboxesMap.computeIfAbsent(workerId, k -> new
HashMap<>()).put(receiverStageId, mailboxInfos);
- receiverMailboxesMap.computeIfAbsent(workerId, k -> new
HashMap<>()).put(senderStageId, mailboxInfos);
- }
- } else {
- // Hash distribute the data to multiple receivers on the same
server instance
+ // NOTE: We use SINGLETON to represent a local exchange. The actual
distribution type is determined by the
+ // parallelism: 1-to-1 when sender and receiver have the same
number of workers, otherwise the data is
+ // hash distributed to the parallel receivers on each server.
computeDirectExchange handles the
+ // co-location assumption and its cross-server fallback.
+ if (numSenders != numReceivers) {
+ // Local exchange with parallelism: hash distribute to the
parallel receivers, so keys are required.
// TODO: Support local exchange with parallelism but no key
Preconditions.checkState(!sendNode.getKeys().isEmpty(), "Local
exchange with parallelism requires keys");
sendNode.setDistributionType(RelDistribution.Type.HASH_DISTRIBUTED);
-
Preconditions.checkState(numReceivers % numSenders == 0,
"Number of receivers: %s should be a multiple of number of
senders: %s for local exchange",
numReceivers, numSenders);
- int parallelism = numReceivers / numSenders;
- int receiverWorkerId = 0;
- for (int senderWorkerId = 0; senderWorkerId < numSenders;
senderWorkerId++) {
- QueryServerInstance senderServer =
senderServerMap.get(senderWorkerId);
- QueryServerInstance receiverServer =
receiverServerMap.get(receiverWorkerId);
- Preconditions.checkState(senderServer.equals(receiverServer),
- "Got different server for local exchange, sender %s: %s,
receiver %s: %s", senderWorkerId,
- senderServer, receiverWorkerId, receiverServer);
- computeDirectExchangeWithParallelism(senderMailboxesMap,
receiverMailboxesMap, senderStageId,
- receiverStageId, senderWorkerId, receiverWorkerId,
senderServer, receiverServer, parallelism);
- receiverWorkerId += parallelism;
- }
}
+ int parallelism = numReceivers / numSenders;
+ computeDirectExchange(senderMailboxesMap, receiverMailboxesMap,
senderStageId, receiverStageId,
+ senderServerMap, receiverServerMap, numSenders, parallelism);
} else if (senderMetadata.isPrePartitioned() &&
isDirectExchangeCompatible(senderMetadata, receiverMetadata)) {
- // Direct exchange possible:
- // - Without parallelism:
- // Send the data to the worker with the same worker id (not
necessary the same instance), 1-to-1 mapping
- // - With parallelism:
- // Fanout based on parallelism from each sender workerID to
sequentially increment receiver workerIDs
+ // Direct exchange: the data is already pre-partitioned, so send it
1-to-1 to the worker with the same worker
+ // id (with parallelism, fan out each sender worker to a contiguous
range of receiver workers). The
+ // co-location handling is the same as SINGLETON, see
computeDirectExchange.
int parallelism = numReceivers / numSenders;
- if (parallelism == 1) {
- // 1-to-1 mapping
- for (int workerId = 0; workerId < numSenders; workerId++) {
- QueryServerInstance senderServer = senderServerMap.get(workerId);
- QueryServerInstance receiverServer =
receiverServerMap.get(workerId);
- List<Integer> workerIds = List.of(workerId);
- MailboxInfos senderMailboxInfos;
- MailboxInfos receiverMailboxInfos;
- if (senderServer.equals(receiverServer)) {
- senderMailboxInfos = new SharedMailboxInfos(
- new MailboxInfo(senderServer.getHostname(),
senderServer.getQueryMailboxPort(), workerIds));
- receiverMailboxInfos = senderMailboxInfos;
- } else {
- senderMailboxInfos = new MailboxInfos(
- new MailboxInfo(senderServer.getHostname(),
senderServer.getQueryMailboxPort(), workerIds));
- receiverMailboxInfos = new MailboxInfos(
- new MailboxInfo(receiverServer.getHostname(),
receiverServer.getQueryMailboxPort(), workerIds));
- }
- senderMailboxesMap.computeIfAbsent(workerId, k -> new
HashMap<>())
- .put(receiverStageId, receiverMailboxInfos);
- receiverMailboxesMap.computeIfAbsent(workerId, k -> new
HashMap<>())
- .put(senderStageId, senderMailboxInfos);
- }
- } else {
- // 1-to-<parallelism> mapping
- int receiverWorkerId = 0;
- for (int senderWorkerId = 0; senderWorkerId < numSenders;
senderWorkerId++) {
- QueryServerInstance senderServer =
senderServerMap.get(senderWorkerId);
- QueryServerInstance receiverServer =
receiverServerMap.get(receiverWorkerId);
- computeDirectExchangeWithParallelism(senderMailboxesMap,
receiverMailboxesMap, senderStageId,
- receiverStageId, senderWorkerId, receiverWorkerId,
senderServer, receiverServer, parallelism);
- receiverWorkerId += parallelism;
- }
- }
+ computeDirectExchange(senderMailboxesMap, receiverMailboxesMap,
senderStageId, receiverStageId,
+ senderServerMap, receiverServerMap, numSenders, parallelism);
} else {
// For other exchange types, send the data to all the instances in
the receiver fragment
// TODO: Add support for more exchange types
@@ -144,6 +89,70 @@ public class MailboxAssignmentVisitor extends
DefaultPostOrderTraversalVisitor<V
return null;
}
+ /// Wires the mailboxes for a direct exchange where sender worker `i` sends
to receiver worker `i` (or, with
+ /// `parallelism` > 1, fans out to a contiguous range of `parallelism`
receiver workers). Shared by the SINGLETON
+ /// (local exchange) and pre-partitioned exchange paths.
+ ///
+ /// Sender worker `i` and receiver worker `i` are normally co-located on the
same server, because the receiver worker
+ /// map is derived from the sender's (see
`WorkerManager#assignWorkersForLocalExchange`). When they are co-located the
+ /// data is exchanged in-process via a single shared local mailbox with no
network hop.
+ ///
+ /// They are not guaranteed to be co-located, so this does not assert it. A
colocated semi-join is the exception:
+ /// `PinotJoinToDynamicBroadcastRule` emits a SINGLETON `PIPELINE_BREAKER`
exchange on the build side purely from the
+ /// `is_colocated_by_join_keys` hint, and the build (sender) leaf and join
(receiver) leaf are routed independently
+ /// per table. During a rolling restart some segments are temporarily not
queryable, so each side may reroute a
+ /// partition to a different replica, leaving worker `i` on different
servers. Rather than failing the query, we fall
+ /// back to a cross-server send: the exchange stays correct because worker
id still maps to the same partition on both
+ /// sides, and we only lose locality (one extra network hop) until routing
re-stabilizes.
+ private void computeDirectExchange(Map<Integer, Map<Integer, MailboxInfos>>
senderMailboxesMap,
+ Map<Integer, Map<Integer, MailboxInfos>> receiverMailboxesMap, Integer
senderStageId, Integer receiverStageId,
+ Map<Integer, QueryServerInstance> senderServerMap, Map<Integer,
QueryServerInstance> receiverServerMap,
+ int numSenders, int parallelism) {
+ if (parallelism == 1) {
+ // 1-to-1 mapping
+ for (int workerId = 0; workerId < numSenders; workerId++) {
+ QueryServerInstance senderServer = senderServerMap.get(workerId);
+ QueryServerInstance receiverServer = receiverServerMap.get(workerId);
+ List<Integer> workerIds = List.of(workerId);
+ MailboxInfos senderMailboxInfos;
+ MailboxInfos receiverMailboxInfos;
+ if (senderServer.equals(receiverServer)) {
+ // Co-located: share one local mailbox, exchanged in-process with no
network hop.
+ MailboxInfos sharedMailboxInfos = new SharedMailboxInfos(
+ new MailboxInfo(senderServer.getHostname(),
senderServer.getQueryMailboxPort(), workerIds));
+ senderMailboxInfos = sharedMailboxInfos;
+ receiverMailboxInfos = sharedMailboxInfos;
+ } else {
+ // Not co-located: cross-server send via separate sender and
receiver mailboxes.
+ // TODO: Match the sender and receiver worker assignments upfront so
co-located data never crosses the wire.
+ // The sender and receiver leaves are routed independently per
table today; co-routing them onto a shared
+ // worker-to-server map (picking, per partition, a server that
hosts the partition for both sides) would
+ // keep this exchange local even during a rolling restart, falling
back to a cross-server send only for
+ // partitions that genuinely cannot be co-located. See
PinotJoinToDynamicBroadcastRule and WorkerManager's
+ // leaf worker assignment.
+ senderMailboxInfos = new MailboxInfos(
+ new MailboxInfo(senderServer.getHostname(),
senderServer.getQueryMailboxPort(), workerIds));
+ receiverMailboxInfos = new MailboxInfos(
+ new MailboxInfo(receiverServer.getHostname(),
receiverServer.getQueryMailboxPort(), workerIds));
+ }
+ senderMailboxesMap.computeIfAbsent(workerId, k -> new HashMap<>())
+ .put(receiverStageId, receiverMailboxInfos);
+ receiverMailboxesMap.computeIfAbsent(workerId, k -> new HashMap<>())
+ .put(senderStageId, senderMailboxInfos);
+ }
+ } else {
+ // 1-to-<parallelism> mapping
+ int receiverWorkerId = 0;
+ for (int senderWorkerId = 0; senderWorkerId < numSenders;
senderWorkerId++) {
+ QueryServerInstance senderServer = senderServerMap.get(senderWorkerId);
+ QueryServerInstance receiverServer =
receiverServerMap.get(receiverWorkerId);
+ computeDirectExchangeWithParallelism(senderMailboxesMap,
receiverMailboxesMap, senderStageId,
+ receiverStageId, senderWorkerId, receiverWorkerId, senderServer,
receiverServer, parallelism);
+ receiverWorkerId += parallelism;
+ }
+ }
+ }
+
private void computeDirectExchangeWithParallelism(Map<Integer, Map<Integer,
MailboxInfos>> senderMailboxesMap,
Map<Integer, Map<Integer, MailboxInfos>> receiverMailboxesMap, Integer
senderStageId, Integer receiverStageId,
int senderWorkerId, int receiverWorkerId, QueryServerInstance
senderServer, QueryServerInstance receiverServer,
diff --git
a/pinot-query-planner/src/test/java/org/apache/pinot/query/planner/physical/MailboxAssignmentVisitorTest.java
b/pinot-query-planner/src/test/java/org/apache/pinot/query/planner/physical/MailboxAssignmentVisitorTest.java
index 9df5eae9cac..6c3ec80ab3b 100644
---
a/pinot-query-planner/src/test/java/org/apache/pinot/query/planner/physical/MailboxAssignmentVisitorTest.java
+++
b/pinot-query-planner/src/test/java/org/apache/pinot/query/planner/physical/MailboxAssignmentVisitorTest.java
@@ -21,12 +21,22 @@ package org.apache.pinot.query.planner.physical;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
+import org.apache.calcite.rel.RelDistribution;
+import org.apache.pinot.calcite.rel.logical.PinotRelExchangeType;
+import org.apache.pinot.common.utils.DataSchema;
+import org.apache.pinot.common.utils.DataSchema.ColumnDataType;
import org.apache.pinot.query.QueryEnvironmentTestBase;
+import org.apache.pinot.query.planner.plannode.MailboxSendNode;
import org.apache.pinot.query.routing.MailboxInfo;
import org.apache.pinot.query.routing.MailboxInfos;
+import org.apache.pinot.query.routing.QueryServerInstance;
+import org.apache.pinot.query.routing.SharedMailboxInfos;
import org.apache.pinot.query.routing.WorkerMetadata;
+import org.mockito.Mockito;
import org.testng.annotations.Test;
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertFalse;
import static org.testng.Assert.assertTrue;
@@ -69,6 +79,93 @@ public class MailboxAssignmentVisitorTest extends
QueryEnvironmentTestBase {
verifyAllMailboxInfosSorted(subPlan, query);
}
+ private static final int SENDER_STAGE = 1;
+ private static final int RECEIVER_STAGE = 0;
+
+ /// A SINGLETON local exchange where sender worker `i` and receiver worker
`i` land on different servers (as can
+ /// happen for a colocated semi-join during a rolling restart) must not
fail: it falls back to a cross-server send
+ /// for the diverged worker while keeping the co-located worker local.
+ @Test
+ public void testSingletonFallsBackToCrossServerWhenWorkersDiverge() {
+ // Worker 0 co-located on A; worker 1 diverged (sender on B, receiver on
C).
+ DispatchablePlanMetadata sender = metadata(Map.of(0, server("A"), 1,
server("B")));
+ DispatchablePlanMetadata receiver = metadata(Map.of(0, server("A"), 1,
server("C")));
+ process(singletonSendNode(List.of()), sender, receiver);
+
+ Map<Integer, Map<Integer, MailboxInfos>> senderMailboxes =
sender.getWorkerIdToMailboxesMap();
+ Map<Integer, Map<Integer, MailboxInfos>> receiverMailboxes =
receiver.getWorkerIdToMailboxesMap();
+
+ // Worker 0 is co-located: a single shared local mailbox on host_A on both
sides.
+ assertTrue(senderMailboxes.get(0).get(RECEIVER_STAGE) instanceof
SharedMailboxInfos);
+ assertEquals(singleMailbox(senderMailboxes, 0,
RECEIVER_STAGE).getHostname(), "host_A");
+ assertEquals(singleMailbox(receiverMailboxes, 0,
SENDER_STAGE).getHostname(), "host_A");
+
+ // Worker 1 diverged: cross-server send, not a shared mailbox. The sender
sends to the receiver's server (C) and
+ // the receiver reads from the sender's server (B).
+ assertFalse(senderMailboxes.get(1).get(RECEIVER_STAGE) instanceof
SharedMailboxInfos);
+ assertEquals(singleMailbox(senderMailboxes, 1,
RECEIVER_STAGE).getHostname(), "host_C");
+ assertEquals(singleMailbox(receiverMailboxes, 1,
SENDER_STAGE).getHostname(), "host_B");
+ }
+
+ /// An unequal-but-non-multiple worker count (2 senders, 3 receivers) must
be rejected rather than rounding the
+ /// parallelism down to 1 and silently dropping the extra receiver.
+ @Test(expectedExceptions = IllegalStateException.class,
+ expectedExceptionsMessageRegExp = ".*multiple of number of senders.*")
+ public void testSingletonRejectsNonMultipleReceiverCount() {
+ DispatchablePlanMetadata sender = metadata(Map.of(0, server("A"), 1,
server("B")));
+ DispatchablePlanMetadata receiver = metadata(Map.of(0, server("A"), 1,
server("B"), 2, server("C")));
+ process(singletonSendNode(List.of(0)), sender, receiver);
+ }
+
+ /// A SINGLETON local exchange with parallelism (more receivers than
senders) does not assert co-location either: it
+ /// rewrites the distribution to HASH and fans each sender worker out to its
receiver workers, even cross-server.
+ @Test
+ public void testSingletonWithParallelismAllowsCrossServer() {
+ // 1 sender on A, 2 receivers on B (parallelism 2), so the fan-out is
cross-server.
+ DispatchablePlanMetadata sender = metadata(Map.of(0, server("A")));
+ DispatchablePlanMetadata receiver = metadata(Map.of(0, server("B"), 1,
server("B")));
+ MailboxSendNode sendNode = singletonSendNode(List.of(0));
+ process(sendNode, sender, receiver);
+
+ assertEquals(sendNode.getDistributionType(),
RelDistribution.Type.HASH_DISTRIBUTED);
+ MailboxInfo senderToReceiver =
singleMailbox(sender.getWorkerIdToMailboxesMap(), 0, RECEIVER_STAGE);
+ assertEquals(senderToReceiver.getHostname(), "host_B");
+ assertEquals(senderToReceiver.getWorkerIds(), List.of(0, 1));
+ assertEquals(singleMailbox(receiver.getWorkerIdToMailboxesMap(), 0,
SENDER_STAGE).getHostname(), "host_A");
+ assertEquals(singleMailbox(receiver.getWorkerIdToMailboxesMap(), 1,
SENDER_STAGE).getHostname(), "host_A");
+ }
+
+ private static QueryServerInstance server(String id) {
+ return new QueryServerInstance(id, "host_" + id, 1, 1);
+ }
+
+ private static DispatchablePlanMetadata metadata(Map<Integer,
QueryServerInstance> workerIdToServerInstanceMap) {
+ DispatchablePlanMetadata metadata = new DispatchablePlanMetadata();
+ metadata.setWorkerIdToServerInstanceMap(workerIdToServerInstanceMap);
+ return metadata;
+ }
+
+ private static MailboxSendNode singletonSendNode(List<Integer> keys) {
+ DataSchema dataSchema = new DataSchema(new String[]{"col"}, new
ColumnDataType[]{ColumnDataType.INT});
+ return new MailboxSendNode(SENDER_STAGE, dataSchema, List.of(),
RECEIVER_STAGE,
+ PinotRelExchangeType.PIPELINE_BREAKER, RelDistribution.Type.SINGLETON,
keys, false, null, false, "absHashCode");
+ }
+
+ private static void process(MailboxSendNode sendNode,
DispatchablePlanMetadata sender,
+ DispatchablePlanMetadata receiver) {
+ DispatchablePlanContext context =
Mockito.mock(DispatchablePlanContext.class);
+ Mockito.when(context.getDispatchablePlanMetadataMap())
+ .thenReturn(Map.of(SENDER_STAGE, sender, RECEIVER_STAGE, receiver));
+ MailboxAssignmentVisitor.INSTANCE.process(sendNode, context);
+ }
+
+ private static MailboxInfo singleMailbox(Map<Integer, Map<Integer,
MailboxInfos>> mailboxesMap, int workerId,
+ int stageId) {
+ List<MailboxInfo> mailboxInfos =
mailboxesMap.get(workerId).get(stageId).getMailboxInfos();
+ assertEquals(mailboxInfos.size(), 1);
+ return mailboxInfos.get(0);
+ }
+
private void verifyAllMailboxInfosSorted(DispatchableSubPlan subPlan, String
query) {
for (DispatchablePlanFragment fragment : subPlan.getQueryStages()) {
List<WorkerMetadata> workerMetadataList =
fragment.getWorkerMetadataList();
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]