This is an automated email from the ASF dual-hosted git repository.

Jackie-Jiang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new 594428df356 Fall back to cross-server send for SINGLETON exchange when 
workers diverge (#18775)
594428df356 is described below

commit 594428df3565f0efe4e29054cb65b5b2e4d3dc0f
Author: Xiaotian (Jackie) Jiang <[email protected]>
AuthorDate: Tue Jun 16 00:26:19 2026 -0700

    Fall back to cross-server send for SINGLETON exchange when workers diverge 
(#18775)
---
 .../planner/physical/MailboxAssignmentVisitor.java | 147 +++++++++++----------
 .../physical/MailboxAssignmentVisitorTest.java     |  97 ++++++++++++++
 2 files changed, 175 insertions(+), 69 deletions(-)

diff --git 
a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/physical/MailboxAssignmentVisitor.java
 
b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/physical/MailboxAssignmentVisitor.java
index 233874017ae..41076e49d0e 100644
--- 
a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/physical/MailboxAssignmentVisitor.java
+++ 
b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/physical/MailboxAssignmentVisitor.java
@@ -55,84 +55,29 @@ public class MailboxAssignmentVisitor extends 
DefaultPostOrderTraversalVisitor<V
         int numSenders = senderServerMap.size();
         int numReceivers = receiverServerMap.size();
         if (sendNode.getDistributionType() == RelDistribution.Type.SINGLETON) {
-          // NOTE: We use SINGLETON to represent local exchange. The actual 
distribution type is determined by the
-          //       parallelism.
-          if (numSenders == numReceivers) {
-            // Send the data to the same instance (same worker id) with 
SINGLETON distribution type
-            for (int workerId = 0; workerId < numSenders; workerId++) {
-              QueryServerInstance senderServer = senderServerMap.get(workerId);
-              QueryServerInstance receiverServer = 
receiverServerMap.get(workerId);
-              Preconditions.checkState(senderServer.equals(receiverServer),
-                  "Got different server for SINGLETON distribution type for 
worker id: %s, sender: %s, receiver: %s",
-                  workerId, senderServer, receiverServer);
-              MailboxInfos mailboxInfos = new SharedMailboxInfos(
-                  new MailboxInfo(senderServer.getHostname(), 
senderServer.getQueryMailboxPort(), List.of(workerId)));
-              senderMailboxesMap.computeIfAbsent(workerId, k -> new 
HashMap<>()).put(receiverStageId, mailboxInfos);
-              receiverMailboxesMap.computeIfAbsent(workerId, k -> new 
HashMap<>()).put(senderStageId, mailboxInfos);
-            }
-          } else {
-            // Hash distribute the data to multiple receivers on the same 
server instance
+          // NOTE: We use SINGLETON to represent a local exchange. The actual 
distribution type is determined by the
+          //       parallelism: 1-to-1 when sender and receiver have the same 
number of workers, otherwise the data is
+          //       hash distributed to the parallel receivers on each server. 
computeDirectExchange handles the
+          //       co-location assumption and its cross-server fallback.
+          if (numSenders != numReceivers) {
+            // Local exchange with parallelism: hash distribute to the 
parallel receivers, so keys are required.
             // TODO: Support local exchange with parallelism but no key
             Preconditions.checkState(!sendNode.getKeys().isEmpty(), "Local 
exchange with parallelism requires keys");
             
sendNode.setDistributionType(RelDistribution.Type.HASH_DISTRIBUTED);
-
             Preconditions.checkState(numReceivers % numSenders == 0,
                 "Number of receivers: %s should be a multiple of number of 
senders: %s for local exchange",
                 numReceivers, numSenders);
-            int parallelism = numReceivers / numSenders;
-            int receiverWorkerId = 0;
-            for (int senderWorkerId = 0; senderWorkerId < numSenders; 
senderWorkerId++) {
-              QueryServerInstance senderServer = 
senderServerMap.get(senderWorkerId);
-              QueryServerInstance receiverServer = 
receiverServerMap.get(receiverWorkerId);
-              Preconditions.checkState(senderServer.equals(receiverServer),
-                  "Got different server for local exchange, sender %s: %s, 
receiver %s: %s", senderWorkerId,
-                  senderServer, receiverWorkerId, receiverServer);
-              computeDirectExchangeWithParallelism(senderMailboxesMap, 
receiverMailboxesMap, senderStageId,
-                  receiverStageId, senderWorkerId, receiverWorkerId, 
senderServer, receiverServer, parallelism);
-              receiverWorkerId += parallelism;
-            }
           }
+          int parallelism = numReceivers / numSenders;
+          computeDirectExchange(senderMailboxesMap, receiverMailboxesMap, 
senderStageId, receiverStageId,
+              senderServerMap, receiverServerMap, numSenders, parallelism);
         } else if (senderMetadata.isPrePartitioned() && 
isDirectExchangeCompatible(senderMetadata, receiverMetadata)) {
-          // Direct exchange possible:
-          // - Without parallelism:
-          //   Send the data to the worker with the same worker id (not 
necessary the same instance), 1-to-1 mapping
-          // - With parallelism:
-          //   Fanout based on parallelism from each sender workerID to 
sequentially increment receiver workerIDs
+          // Direct exchange: the data is already pre-partitioned, so send it 
1-to-1 to the worker with the same worker
+          // id (with parallelism, fan out each sender worker to a contiguous 
range of receiver workers). The
+          // co-location handling is the same as SINGLETON, see 
computeDirectExchange.
           int parallelism = numReceivers / numSenders;
-          if (parallelism == 1) {
-            // 1-to-1 mapping
-            for (int workerId = 0; workerId < numSenders; workerId++) {
-              QueryServerInstance senderServer = senderServerMap.get(workerId);
-              QueryServerInstance receiverServer = 
receiverServerMap.get(workerId);
-              List<Integer> workerIds = List.of(workerId);
-              MailboxInfos senderMailboxInfos;
-              MailboxInfos receiverMailboxInfos;
-              if (senderServer.equals(receiverServer)) {
-                senderMailboxInfos = new SharedMailboxInfos(
-                    new MailboxInfo(senderServer.getHostname(), 
senderServer.getQueryMailboxPort(), workerIds));
-                receiverMailboxInfos = senderMailboxInfos;
-              } else {
-                senderMailboxInfos = new MailboxInfos(
-                    new MailboxInfo(senderServer.getHostname(), 
senderServer.getQueryMailboxPort(), workerIds));
-                receiverMailboxInfos = new MailboxInfos(
-                    new MailboxInfo(receiverServer.getHostname(), 
receiverServer.getQueryMailboxPort(), workerIds));
-              }
-              senderMailboxesMap.computeIfAbsent(workerId, k -> new 
HashMap<>())
-                  .put(receiverStageId, receiverMailboxInfos);
-              receiverMailboxesMap.computeIfAbsent(workerId, k -> new 
HashMap<>())
-                  .put(senderStageId, senderMailboxInfos);
-            }
-          } else {
-            // 1-to-<parallelism> mapping
-            int receiverWorkerId = 0;
-            for (int senderWorkerId = 0; senderWorkerId < numSenders; 
senderWorkerId++) {
-              QueryServerInstance senderServer = 
senderServerMap.get(senderWorkerId);
-              QueryServerInstance receiverServer = 
receiverServerMap.get(receiverWorkerId);
-              computeDirectExchangeWithParallelism(senderMailboxesMap, 
receiverMailboxesMap, senderStageId,
-                  receiverStageId, senderWorkerId, receiverWorkerId, 
senderServer, receiverServer, parallelism);
-              receiverWorkerId += parallelism;
-            }
-          }
+          computeDirectExchange(senderMailboxesMap, receiverMailboxesMap, 
senderStageId, receiverStageId,
+              senderServerMap, receiverServerMap, numSenders, parallelism);
         } else {
           // For other exchange types, send the data to all the instances in 
the receiver fragment
           // TODO: Add support for more exchange types
@@ -144,6 +89,70 @@ public class MailboxAssignmentVisitor extends 
DefaultPostOrderTraversalVisitor<V
     return null;
   }
 
+  /// Wires the mailboxes for a direct exchange where sender worker `i` sends 
to receiver worker `i` (or, with
+  /// `parallelism` > 1, fans out to a contiguous range of `parallelism` 
receiver workers). Shared by the SINGLETON
+  /// (local exchange) and pre-partitioned exchange paths.
+  ///
+  /// Sender worker `i` and receiver worker `i` are normally co-located on the 
same server, because the receiver worker
+  /// map is derived from the sender's (see 
`WorkerManager#assignWorkersForLocalExchange`). When they are co-located the
+  /// data is exchanged in-process via a single shared local mailbox with no 
network hop.
+  ///
+  /// They are not guaranteed to be co-located, so this does not assert it. A 
colocated semi-join is the exception:
+  /// `PinotJoinToDynamicBroadcastRule` emits a SINGLETON `PIPELINE_BREAKER` 
exchange on the build side purely from the
+  /// `is_colocated_by_join_keys` hint, and the build (sender) leaf and join 
(receiver) leaf are routed independently
+  /// per table. During a rolling restart some segments are temporarily not 
queryable, so each side may reroute a
+  /// partition to a different replica, leaving worker `i` on different 
servers. Rather than failing the query, we fall
+  /// back to a cross-server send: the exchange stays correct because worker 
id still maps to the same partition on both
+  /// sides, and we only lose locality (one extra network hop) until routing 
re-stabilizes.
+  private void computeDirectExchange(Map<Integer, Map<Integer, MailboxInfos>> 
senderMailboxesMap,
+      Map<Integer, Map<Integer, MailboxInfos>> receiverMailboxesMap, Integer 
senderStageId, Integer receiverStageId,
+      Map<Integer, QueryServerInstance> senderServerMap, Map<Integer, 
QueryServerInstance> receiverServerMap,
+      int numSenders, int parallelism) {
+    if (parallelism == 1) {
+      // 1-to-1 mapping
+      for (int workerId = 0; workerId < numSenders; workerId++) {
+        QueryServerInstance senderServer = senderServerMap.get(workerId);
+        QueryServerInstance receiverServer = receiverServerMap.get(workerId);
+        List<Integer> workerIds = List.of(workerId);
+        MailboxInfos senderMailboxInfos;
+        MailboxInfos receiverMailboxInfos;
+        if (senderServer.equals(receiverServer)) {
+          // Co-located: share one local mailbox, exchanged in-process with no 
network hop.
+          MailboxInfos sharedMailboxInfos = new SharedMailboxInfos(
+              new MailboxInfo(senderServer.getHostname(), 
senderServer.getQueryMailboxPort(), workerIds));
+          senderMailboxInfos = sharedMailboxInfos;
+          receiverMailboxInfos = sharedMailboxInfos;
+        } else {
+          // Not co-located: cross-server send via separate sender and 
receiver mailboxes.
+          // TODO: Match the sender and receiver worker assignments upfront so 
co-located data never crosses the wire.
+          //   The sender and receiver leaves are routed independently per 
table today; co-routing them onto a shared
+          //   worker-to-server map (picking, per partition, a server that 
hosts the partition for both sides) would
+          //   keep this exchange local even during a rolling restart, falling 
back to a cross-server send only for
+          //   partitions that genuinely cannot be co-located. See 
PinotJoinToDynamicBroadcastRule and WorkerManager's
+          //   leaf worker assignment.
+          senderMailboxInfos = new MailboxInfos(
+              new MailboxInfo(senderServer.getHostname(), 
senderServer.getQueryMailboxPort(), workerIds));
+          receiverMailboxInfos = new MailboxInfos(
+              new MailboxInfo(receiverServer.getHostname(), 
receiverServer.getQueryMailboxPort(), workerIds));
+        }
+        senderMailboxesMap.computeIfAbsent(workerId, k -> new HashMap<>())
+            .put(receiverStageId, receiverMailboxInfos);
+        receiverMailboxesMap.computeIfAbsent(workerId, k -> new HashMap<>())
+            .put(senderStageId, senderMailboxInfos);
+      }
+    } else {
+      // 1-to-<parallelism> mapping
+      int receiverWorkerId = 0;
+      for (int senderWorkerId = 0; senderWorkerId < numSenders; 
senderWorkerId++) {
+        QueryServerInstance senderServer = senderServerMap.get(senderWorkerId);
+        QueryServerInstance receiverServer = 
receiverServerMap.get(receiverWorkerId);
+        computeDirectExchangeWithParallelism(senderMailboxesMap, 
receiverMailboxesMap, senderStageId,
+            receiverStageId, senderWorkerId, receiverWorkerId, senderServer, 
receiverServer, parallelism);
+        receiverWorkerId += parallelism;
+      }
+    }
+  }
+
   private void computeDirectExchangeWithParallelism(Map<Integer, Map<Integer, 
MailboxInfos>> senderMailboxesMap,
       Map<Integer, Map<Integer, MailboxInfos>> receiverMailboxesMap, Integer 
senderStageId, Integer receiverStageId,
       int senderWorkerId, int receiverWorkerId, QueryServerInstance 
senderServer, QueryServerInstance receiverServer,
diff --git 
a/pinot-query-planner/src/test/java/org/apache/pinot/query/planner/physical/MailboxAssignmentVisitorTest.java
 
b/pinot-query-planner/src/test/java/org/apache/pinot/query/planner/physical/MailboxAssignmentVisitorTest.java
index 9df5eae9cac..6c3ec80ab3b 100644
--- 
a/pinot-query-planner/src/test/java/org/apache/pinot/query/planner/physical/MailboxAssignmentVisitorTest.java
+++ 
b/pinot-query-planner/src/test/java/org/apache/pinot/query/planner/physical/MailboxAssignmentVisitorTest.java
@@ -21,12 +21,22 @@ package org.apache.pinot.query.planner.physical;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
+import org.apache.calcite.rel.RelDistribution;
+import org.apache.pinot.calcite.rel.logical.PinotRelExchangeType;
+import org.apache.pinot.common.utils.DataSchema;
+import org.apache.pinot.common.utils.DataSchema.ColumnDataType;
 import org.apache.pinot.query.QueryEnvironmentTestBase;
+import org.apache.pinot.query.planner.plannode.MailboxSendNode;
 import org.apache.pinot.query.routing.MailboxInfo;
 import org.apache.pinot.query.routing.MailboxInfos;
+import org.apache.pinot.query.routing.QueryServerInstance;
+import org.apache.pinot.query.routing.SharedMailboxInfos;
 import org.apache.pinot.query.routing.WorkerMetadata;
+import org.mockito.Mockito;
 import org.testng.annotations.Test;
 
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertFalse;
 import static org.testng.Assert.assertTrue;
 
 
@@ -69,6 +79,93 @@ public class MailboxAssignmentVisitorTest extends 
QueryEnvironmentTestBase {
     verifyAllMailboxInfosSorted(subPlan, query);
   }
 
+  private static final int SENDER_STAGE = 1;
+  private static final int RECEIVER_STAGE = 0;
+
+  /// A SINGLETON local exchange where sender worker `i` and receiver worker 
`i` land on different servers (as can
+  /// happen for a colocated semi-join during a rolling restart) must not 
fail: it falls back to a cross-server send
+  /// for the diverged worker while keeping the co-located worker local.
+  @Test
+  public void testSingletonFallsBackToCrossServerWhenWorkersDiverge() {
+    // Worker 0 co-located on A; worker 1 diverged (sender on B, receiver on 
C).
+    DispatchablePlanMetadata sender = metadata(Map.of(0, server("A"), 1, 
server("B")));
+    DispatchablePlanMetadata receiver = metadata(Map.of(0, server("A"), 1, 
server("C")));
+    process(singletonSendNode(List.of()), sender, receiver);
+
+    Map<Integer, Map<Integer, MailboxInfos>> senderMailboxes = 
sender.getWorkerIdToMailboxesMap();
+    Map<Integer, Map<Integer, MailboxInfos>> receiverMailboxes = 
receiver.getWorkerIdToMailboxesMap();
+
+    // Worker 0 is co-located: a single shared local mailbox on host_A on both 
sides.
+    assertTrue(senderMailboxes.get(0).get(RECEIVER_STAGE) instanceof 
SharedMailboxInfos);
+    assertEquals(singleMailbox(senderMailboxes, 0, 
RECEIVER_STAGE).getHostname(), "host_A");
+    assertEquals(singleMailbox(receiverMailboxes, 0, 
SENDER_STAGE).getHostname(), "host_A");
+
+    // Worker 1 diverged: cross-server send, not a shared mailbox. The sender 
sends to the receiver's server (C) and
+    // the receiver reads from the sender's server (B).
+    assertFalse(senderMailboxes.get(1).get(RECEIVER_STAGE) instanceof 
SharedMailboxInfos);
+    assertEquals(singleMailbox(senderMailboxes, 1, 
RECEIVER_STAGE).getHostname(), "host_C");
+    assertEquals(singleMailbox(receiverMailboxes, 1, 
SENDER_STAGE).getHostname(), "host_B");
+  }
+
+  /// An unequal-but-non-multiple worker count (2 senders, 3 receivers) must 
be rejected rather than rounding the
+  /// parallelism down to 1 and silently dropping the extra receiver.
+  @Test(expectedExceptions = IllegalStateException.class,
+      expectedExceptionsMessageRegExp = ".*multiple of number of senders.*")
+  public void testSingletonRejectsNonMultipleReceiverCount() {
+    DispatchablePlanMetadata sender = metadata(Map.of(0, server("A"), 1, 
server("B")));
+    DispatchablePlanMetadata receiver = metadata(Map.of(0, server("A"), 1, 
server("B"), 2, server("C")));
+    process(singletonSendNode(List.of(0)), sender, receiver);
+  }
+
+  /// A SINGLETON local exchange with parallelism (more receivers than 
senders) does not assert co-location either: it
+  /// rewrites the distribution to HASH and fans each sender worker out to its 
receiver workers, even cross-server.
+  @Test
+  public void testSingletonWithParallelismAllowsCrossServer() {
+    // 1 sender on A, 2 receivers on B (parallelism 2), so the fan-out is 
cross-server.
+    DispatchablePlanMetadata sender = metadata(Map.of(0, server("A")));
+    DispatchablePlanMetadata receiver = metadata(Map.of(0, server("B"), 1, 
server("B")));
+    MailboxSendNode sendNode = singletonSendNode(List.of(0));
+    process(sendNode, sender, receiver);
+
+    assertEquals(sendNode.getDistributionType(), 
RelDistribution.Type.HASH_DISTRIBUTED);
+    MailboxInfo senderToReceiver = 
singleMailbox(sender.getWorkerIdToMailboxesMap(), 0, RECEIVER_STAGE);
+    assertEquals(senderToReceiver.getHostname(), "host_B");
+    assertEquals(senderToReceiver.getWorkerIds(), List.of(0, 1));
+    assertEquals(singleMailbox(receiver.getWorkerIdToMailboxesMap(), 0, 
SENDER_STAGE).getHostname(), "host_A");
+    assertEquals(singleMailbox(receiver.getWorkerIdToMailboxesMap(), 1, 
SENDER_STAGE).getHostname(), "host_A");
+  }
+
+  private static QueryServerInstance server(String id) {
+    return new QueryServerInstance(id, "host_" + id, 1, 1);
+  }
+
+  private static DispatchablePlanMetadata metadata(Map<Integer, 
QueryServerInstance> workerIdToServerInstanceMap) {
+    DispatchablePlanMetadata metadata = new DispatchablePlanMetadata();
+    metadata.setWorkerIdToServerInstanceMap(workerIdToServerInstanceMap);
+    return metadata;
+  }
+
+  private static MailboxSendNode singletonSendNode(List<Integer> keys) {
+    DataSchema dataSchema = new DataSchema(new String[]{"col"}, new 
ColumnDataType[]{ColumnDataType.INT});
+    return new MailboxSendNode(SENDER_STAGE, dataSchema, List.of(), 
RECEIVER_STAGE,
+        PinotRelExchangeType.PIPELINE_BREAKER, RelDistribution.Type.SINGLETON, 
keys, false, null, false, "absHashCode");
+  }
+
+  private static void process(MailboxSendNode sendNode, 
DispatchablePlanMetadata sender,
+      DispatchablePlanMetadata receiver) {
+    DispatchablePlanContext context = 
Mockito.mock(DispatchablePlanContext.class);
+    Mockito.when(context.getDispatchablePlanMetadataMap())
+        .thenReturn(Map.of(SENDER_STAGE, sender, RECEIVER_STAGE, receiver));
+    MailboxAssignmentVisitor.INSTANCE.process(sendNode, context);
+  }
+
+  private static MailboxInfo singleMailbox(Map<Integer, Map<Integer, 
MailboxInfos>> mailboxesMap, int workerId,
+      int stageId) {
+    List<MailboxInfo> mailboxInfos = 
mailboxesMap.get(workerId).get(stageId).getMailboxInfos();
+    assertEquals(mailboxInfos.size(), 1);
+    return mailboxInfos.get(0);
+  }
+
   private void verifyAllMailboxInfosSorted(DispatchableSubPlan subPlan, String 
query) {
     for (DispatchablePlanFragment fragment : subPlan.getQueryStages()) {
       List<WorkerMetadata> workerMetadataList = 
fragment.getWorkerMetadataList();


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to